You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/08/31 06:05:03 UTC

[apisix] branch master updated: feat: add elasticsearch-logger (#7643)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new fd1411c82 feat: add elasticsearch-logger (#7643)
fd1411c82 is described below

commit fd1411c82e362c43f9c3aa2fc5c9faba0a8ef78e
Author: wangchengcheng <35...@users.noreply.github.com>
AuthorDate: Wed Aug 31 14:04:53 2022 +0800

    feat: add elasticsearch-logger (#7643)
    
    Co-authored-by: tzssangglass <tz...@gmail.com>
    Co-authored-by: Fei Han <97...@users.noreply.github.com>
---
 apisix/plugins/elasticsearch-logger.lua        | 176 ++++++++++
 ci/pod/docker-compose.plugin.yml               |  27 ++
 conf/config-default.yaml                       |   1 +
 docs/en/latest/config.json                     |   3 +-
 docs/en/latest/plugins/elasticsearch-logger.md | 287 ++++++++++++++++
 docs/zh/latest/README.md                       |   2 +-
 docs/zh/latest/config.json                     |   3 +-
 docs/zh/latest/plugins/elasticsearch-logger.md | 278 +++++++++++++++
 t/admin/plugins.t                              |   1 +
 t/plugin/elasticsearch-logger.t                | 453 +++++++++++++++++++++++++
 10 files changed, 1228 insertions(+), 3 deletions(-)

diff --git a/apisix/plugins/elasticsearch-logger.lua b/apisix/plugins/elasticsearch-logger.lua
new file mode 100644
index 000000000..105cbe4d9
--- /dev/null
+++ b/apisix/plugins/elasticsearch-logger.lua
@@ -0,0 +1,176 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local log_util        = require("apisix.utils.log-util")
+local bp_manager_mod  = require("apisix.utils.batch-processor-manager")
+local plugin          = require("apisix.plugin")
+
+local ngx             = ngx
+local str_format      = core.string.format
+
+local plugin_name = "elasticsearch-logger"
+local batch_processor_manager = bp_manager_mod.new(plugin_name)
+
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = {
+            type = "string",
+            pattern = "[^/]$",
+        },
+        field = {
+            type = "object",
+            properties = {
+                index = { type = "string"},
+                type = { type = "string"}
+            },
+            required = {"index"}
+        },
+        auth = {
+            type = "object",
+            properties = {
+                username = {
+                    type = "string",
+                    minLength = 1
+                },
+                password = {
+                    type = "string",
+                    minLength = 1
+                },
+            },
+            required = {"username", "password"},
+        },
+        timeout = {
+            type = "integer",
+            minimum = 1,
+            default = 10
+        },
+        ssl_verify = {
+            type = "boolean",
+            default = true
+        }
+    },
+    required = { "endpoint_addr", "field" },
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 413,
+    name = plugin_name,
+    schema = batch_processor_manager:wrap_schema(schema),
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function get_logger_entry(conf, ctx)
+    local entry
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+    if metadata and metadata.value.log_format
+        and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+        core.log.info("custom log format entry: ", core.json.delay_encode(entry))
+    else
+        entry = log_util.get_full_log(ngx, conf)
+        core.log.info("full log entry: ", core.json.delay_encode(entry))
+    end
+
+    return core.json.encode({
+            create = {
+                _index = conf.field.index,
+                _type = conf.field.type
+            }
+        }) .. "\n" ..
+        core.json.encode(entry) .. "\n"
+end
+
+
+local function send_to_elasticsearch(conf, entries)
+    local httpc, err = http.new()
+    if not httpc then
+        return false, str_format("create http error: %s", err)
+    end
+
+    local uri = conf.endpoint_addr .. "/_bulk"
+    local body = core.table.concat(entries, "")
+    local headers = {["Content-Type"] = "application/x-ndjson"}
+    if conf.auth then
+        local authorization = "Basic " .. ngx.encode_base64(
+            conf.auth.username .. ":" .. conf.auth.password
+        )
+        headers["Authorization"] = authorization
+    end
+
+    core.log.info("uri: ", uri, ", body: ", body)
+
+    httpc:set_timeout(conf.timeout * 1000)
+    local resp, err = httpc:request_uri(uri, {
+        ssl_verify = conf.ssl_verify,
+        method = "POST",
+        headers = headers,
+        body = body
+    })
+    if not resp then
+        return false, err
+    end
+
+    if resp.status ~= 200 then
+        return false, str_format("elasticsearch server returned status: %d, body: %s",
+        resp.status, resp.body or "")
+    end
+
+    return true
+end
+
+
+function _M.log(conf, ctx)
+    local entry = get_logger_entry(conf, ctx)
+
+    if batch_processor_manager:add_entry(conf, entry) then
+        return
+    end
+
+    local process = function(entries)
+        return send_to_elasticsearch(conf, entries)
+    end
+
+    batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, process)
+end
+
+
+return _M
diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml
index 226abd7ed..2367ccedb 100644
--- a/ci/pod/docker-compose.plugin.yml
+++ b/ci/pod/docker-compose.plugin.yml
@@ -197,6 +197,33 @@ services:
       SPLUNK_HEC_TOKEN: "BD274822-96AA-4DA6-90EC-18940FB2414C"
       SPLUNK_HEC_SSL: "False"
 
+  # Elasticsearch Logger Service
+  elasticsearch-noauth:
+    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.1
+    restart: unless-stopped
+    ports:
+      - "9200:9200"
+      - "9300:9300"
+    environment:
+      ES_JAVA_OPTS: -Xms512m -Xmx512m
+      discovery.type: single-node
+      xpack.security.enabled: 'false'
+
+  elasticsearch-auth:
+    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.1
+    restart: unless-stopped
+    ports:
+      - "9201:9201"
+      - "9301:9301"
+    environment:
+      ES_JAVA_OPTS: -Xms512m -Xmx512m
+      discovery.type: single-node
+      ELASTIC_USERNAME: elastic
+      ELASTIC_PASSWORD: 123456
+      http.port: 9201
+      transport.tcp.port: 9301
+      xpack.security.enabled: 'true'
+
 
 networks:
   apisix_net:
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 89287004f..7795d8172 100755
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -449,6 +449,7 @@ plugins:                          # plugin list (sorted by priority)
   - public-api                     # priority: 501
   - prometheus                     # priority: 500
   - datadog                        # priority: 495
+  - elasticsearch-logger           # priority: 413
   - echo                           # priority: 412
   - loggly                         # priority: 411
   - http-logger                    # priority: 410
diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json
index 2e406005d..edb489ce4 100644
--- a/docs/en/latest/config.json
+++ b/docs/en/latest/config.json
@@ -149,7 +149,8 @@
                 "plugins/google-cloud-logging",
                 "plugins/splunk-hec-logging",
                 "plugins/file-logger",
-                "plugins/loggly"
+                "plugins/loggly",
+                "plugins/elasticsearch-logger"
               ]
             }
           ]
diff --git a/docs/en/latest/plugins/elasticsearch-logger.md b/docs/en/latest/plugins/elasticsearch-logger.md
new file mode 100644
index 000000000..51deead86
--- /dev/null
+++ b/docs/en/latest/plugins/elasticsearch-logger.md
@@ -0,0 +1,287 @@
+---
+title: elasticsearch-logger
+keywords:
+  - APISIX
+  - API Gateway
+  - Plugin
+  - Elasticsearch-logger
+description: This document contains information about the Apache APISIX elasticsearch-logger Plugin.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Description
+
+The `elasticsearch-logger` Plugin is used to forward logs to [Elasticsearch](https://www.elastic.co/guide/en/welcome-to-elastic/current/getting-started-general-purpose.html) for analysis and storage.
+
+When the Plugin is enabled, APISIX will serialize the request context information to [Elasticsearch Bulk format](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk) and submit it to the batch queue. When the maximum batch size is exceeded, the data in the queue is pushed to Elasticsearch. See [batch processor](../batch-processor.md) for more details.
+
+## Attributes
+
+| Name          | Type    | Required | Default                     | Description                                                  |
+| ------------- | ------- | -------- | --------------------------- | ------------------------------------------------------------ |
+| endpoint_addr | string  | True     |                             | Elasticsearch API.                                            |
+| field         | array   | True     |                             | Elasticsearch `field` configuration.                          |
+| field.index   | string  | True     |                             | Elasticsearch [_index field](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field). |
+| field.type    | string  | False    | Elasticsearch default value | Elasticsearch [_type field](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/mapping-type-field.html#mapping-type-field). |
+| auth          | array   | False    |                             | Elasticsearch [authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) configuration. |
+| auth.username | string  | True     |                             | Elasticsearch [authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) username. |
+| auth.password | string  | True     |                             | Elasticsearch [authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html) password. |
+| ssl_verify    | boolean | False    | true                        | When set to `true` enables SSL verification as per [OpenResty docs](https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake). |
+| timeout       | integer | False    | 10                          | Elasticsearch send data timeout in seconds.                  |
+
+This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration.
+
+## Enabling the Plugin
+
+### Full configuration
+
+The example below shows a complete configuration of the Plugin on a specific Route:
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/routes/1 \
+-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "plugins":{
+        "elasticsearch-logger":{
+            "endpoint_addr":"http://127.0.0.1:9200",
+            "field":{
+                "index":"services",
+                "type":"collector"
+            },
+            "auth":{
+                "username":"elastic",
+                "password":"123456"
+            },
+            "ssl_verify":false,
+            "timeout": 60,
+            "retry_delay":1,
+            "buffer_duration":60,
+            "max_retry_count":0,
+            "batch_max_size":1000,
+            "inactive_timeout":5,
+            "name":"elasticsearch-logger"
+        }
+    },
+    "upstream":{
+        "type":"roundrobin",
+        "nodes":{
+            "127.0.0.1:1980":1
+        }
+    },
+    "uri":"/elasticsearch.do"
+}'
+```
+
+### Minimal configuration example
+
+The example below shows a bare minimum configuration of the Plugin on a Route:
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/routes/1 \
+-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "plugins":{
+        "elasticsearch-logger":{
+            "endpoint_addr":"http://127.0.0.1:9200",
+            "field":{
+                "index":"services"
+            }
+        }
+    },
+    "upstream":{
+        "type":"roundrobin",
+        "nodes":{
+            "127.0.0.1:1980":1
+        }
+    },
+    "uri":"/elasticsearch.do"
+}'
+```
+
+## Example usage
+
+Once you have configured the Route to use the Plugin, when you make a request to APISIX, it will be logged in your Elasticsearch server:
+
+```shell
+curl -i http://127.0.0.1:9080/elasticsearch.do\?q\=hello
+HTTP/1.1 200 OK
+...
+hello, world
+```
+
+You should be able to get the log from elasticsearch:
+
+```shell
+curl -X GET "http://127.0.0.1:9200/services/_search" | jq .
+{
+  "took": 0,
+   ...
+    "hits": [
+      {
+        "_index": "services",
+        "_type": "_doc",
+        "_id": "M1qAxYIBRmRqWkmH4Wya",
+        "_score": 1,
+        "_source": {
+          "apisix_latency": 0,
+          "route_id": "1",
+          "server": {
+            "version": "2.15.0",
+            "hostname": "apisix"
+          },
+          "request": {
+            "size": 102,
+            "uri": "/elasticsearch.do?q=hello",
+            "querystring": {
+              "q": "hello"
+            },
+            "headers": {
+              "user-agent": "curl/7.29.0",
+              "host": "127.0.0.1:9080",
+              "accept": "*/*"
+            },
+            "url": "http://127.0.0.1:9080/elasticsearch.do?q=hello",
+            "method": "GET"
+          },
+          "service_id": "",
+          "latency": 0,
+          "upstream": "127.0.0.1:1980",
+          "upstream_latency": 1,
+          "client_ip": "127.0.0.1",
+          "start_time": 1661170929107,
+          "response": {
+            "size": 192,
+            "headers": {
+              "date": "Mon, 22 Aug 2022 12:22:09 GMT",
+              "server": "APISIX/2.15.0",
+              "content-type": "text/plain; charset=utf-8",
+              "connection": "close",
+              "transfer-encoding": "chunked"
+            },
+            "status": 200
+          }
+        }
+      }
+    ]
+  }
+}
+```
+
+## Metadata
+
+You can also set the format of the logs by configuring the Plugin metadata. The following configurations are available:
+
+| Name       | Type   | Required | Default                                                      | Description                                                  |
+| ---------- | ------ | -------- | ------------------------------------------------------------ | ------------------------------------------------------------ |
+| log_format | object | False    | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | Log format declared as key value pairs in JSON format. Values only support strings. [APISIX](https://github.com/apache/apisix/blob/master/docs/en/latest/apisix-variable.md) or [Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by prefixing the string with `$`. |
+
+:::info IMPORTANT
+
+Configuring the Plugin metadata is global in scope. This means that it will take effect on all Routes and Services which use the `elasticsearch-logger` Plugin.
+
+:::
+
+The example below shows how you can configure through the Admin API:
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/elasticsearch-logger \
+-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "log_format": {
+        "host": "$host",
+        "@timestamp": "$time_iso8601",
+        "client_ip": "$remote_addr"
+    }
+}'
+```
+
+With this configuration, your logs would be formatted as shown below:
+
+```shell
+{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
+{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
+```
+
+ make a request to APISIX again:
+
+```shell
+curl -i http://127.0.0.1:9080/elasticsearch.do\?q\=hello
+HTTP/1.1 200 OK
+...
+hello, world
+```
+
+You should be able to get this log from elasticsearch:
+
+```shell
+curl -X GET "http://127.0.0.1:9200/services/_search" | jq .
+{
+  "took": 0,
+  ...
+  "hits": {
+    "total": {
+      "value": 1,
+      "relation": "eq"
+    },
+    "max_score": 1,
+    "hits": [
+      {
+        "_index": "services",
+        "_type": "_doc",
+        "_id": "NVqExYIBRmRqWkmH4WwG",
+        "_score": 1,
+        "_source": {
+          "@timestamp": "2022-08-22T20:26:31+08:00",
+          "client_ip": "127.0.0.1",
+          "host": "127.0.0.1",
+          "route_id": "1"
+        }
+      }
+    ]
+  }
+}
+```
+
+### Disable Metadata
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/elasticsearch-logger \
+-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X DELETE
+```
+
+## Disable Plugin
+
+To disable the `elasticsearch-logger` Plugin, you can delete the corresponding JSON configuration from the Plugin configuration. APISIX will automatically reload and you do not have to restart for this to take effect.
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/routes/1 \
+-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "plugins":{},
+    "upstream":{
+        "type":"roundrobin",
+        "nodes":{
+            "127.0.0.1:1980":1
+        }
+    },
+    "uri":"/elasticsearch.do"
+}'
+```
diff --git a/docs/zh/latest/README.md b/docs/zh/latest/README.md
index 1e1aad369..4c5f6004c 100644
--- a/docs/zh/latest/README.md
+++ b/docs/zh/latest/README.md
@@ -143,7 +143,7 @@ A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、
     - 高性能:在单核上 QPS 可以达到 18k,同时延迟只有 0.2 毫秒。
     - [故障注入](plugins/fault-injection.md)
     - [REST Admin API](admin-api.md):使用 REST Admin API 来控制 Apache APISIX,默认只允许 127.0.0.1 访问,你可以修改 `conf/config.yaml` 中的 `allow_admin` 字段,指定允许调用 Admin API 的 IP 列表。同时需要注意的是,Admin API 使用 key auth 来校验调用者身份,**在部署前需要修改 `conf/config.yaml` 中的 `admin_key` 字段,来保证安全。**
-    - 外部日志记录器:将访问日志导出到外部日志管理工具。([HTTP Logger](plugins/http-logger.md)、[TCP Logger](plugins/tcp-logger.md)、[Kafka Logger](plugins/kafka-logger.md)、[UDP Logger](plugins/udp-logger.md)、[RocketMQ Logger](plugins/rocketmq-logger.md)、[SkyWalking Logger](plugins/skywalking-logger.md)、[Alibaba Cloud Logging(SLS)](plugins/sls-logger.md)、[Google Cloud Logging](plugins/google-cloud-logging.md)、[Splunk HEC Logging](plugins/splunk-hec-logging.md)、[File Logger](plugins/file-logger.md))
+    - 外部日志记录器:将访问日志导出到外部日志管理工具。([HTTP Logger](plugins/http-logger.md)、[TCP Logger](plugins/tcp-logger.md)、[Kafka Logger](plugins/kafka-logger.md)、[UDP Logger](plugins/udp-logger.md)、[RocketMQ Logger](plugins/rocketmq-logger.md)、[SkyWalking Logger](plugins/skywalking-logger.md)、[Alibaba Cloud Logging(SLS)](plugins/sls-logger.md)、[Google Cloud Logging](plugins/google-cloud-logging.md)、[Splunk HEC Logging](plugins/splunk-hec-logging.md)、[File Logger](plugins/file-logger.md)、[Elasticsearch L [...]
     - [Helm charts](https://github.com/apache/apisix-helm-chart)
 
 - **高度可扩展**
diff --git a/docs/zh/latest/config.json b/docs/zh/latest/config.json
index 6d70ff30e..686e68a00 100644
--- a/docs/zh/latest/config.json
+++ b/docs/zh/latest/config.json
@@ -153,7 +153,8 @@
                 "plugins/google-cloud-logging",
                 "plugins/splunk-hec-logging",
                 "plugins/file-logger",
-                "plugins/loggly"
+                "plugins/loggly",
+                "plugins/elasticsearch-logger"
               ]
             }
           ]
diff --git a/docs/zh/latest/plugins/elasticsearch-logger.md b/docs/zh/latest/plugins/elasticsearch-logger.md
new file mode 100644
index 000000000..82e035d05
--- /dev/null
+++ b/docs/zh/latest/plugins/elasticsearch-logger.md
@@ -0,0 +1,278 @@
+---
+title: elasticsearch-logger
+keywords:
+  - APISIX
+  - API 网关
+  - 插件
+  - Elasticsearch-logger
+  - 日志
+description: 本文介绍了 API 网关 Apache APISIX 的 elasticsearch-logger 插件。使用该插件可以将 APISIX 的日志数据推送到 Elasticserach。
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 描述
+
+`elasticsearch-logger` 插件用于将 `Apache APISIX` 的请求日志转发到 `Elasticsearch` 中进行分析和存储。
+
+启用该插件后 APISIX 将在 `Log Phase` 获取请求上下文信息并序列化为 [Bulk 格式](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk) 后提交到批处理队列中,当触发批处理队列每批次最大处理容量或刷新缓冲区的最大时间时会将队列中的数据提交到 Elaticsearch 中。更多信息,请参考 [Batch-Processor](./batch-processor.md)。
+
+## 属性
+
+| 名称          | 类型    | 必选项 | 默认值               | 描述                                                         |
+| ------------- | ------- | -------- | -------------------- | ------------------------------------------------------------ |
+| endpoint_addr | string  | 是       |                      | Elasticsearch API。                                           |
+| field         | array   | 是       |                      | Elasticsearch `field`配置信息。                                |
+| field.index   | string  | 是       |                      | Elasticsearch `[_index field](https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-index-field.html#mapping-index-field)`。 |
+| field.type    | string  | 否       | Elasticsearch 默认值 | Elasticsearch `[_type field](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/mapping-type-field.html#mapping-type-field)` |
+| auth          | array   | 否       |                      | Elasticsearch `[authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html)` 配置信息 |
+| auth.username | string  | 是       |                      | Elasticsearch `[authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html)` 用户名。 |
+| auth.password | string  | 是       |                      | Elasticsearch `[authentication](https://www.elastic.co/guide/en/elasticsearch/reference/current/setting-up-authentication.html)` 密码。 |
+| ssl_verify    | boolean | 否       | true                 | 当设置为 `true` 时则启用 SSL 验证。更多信息请参考 [lua-nginx-module](https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake)。 |
+| timeout       | integer | 否       | 10                   | 发送给 Elasticsearch 请求超时时间。                            |
+
+本插件支持使用批处理器来聚合并批量处理条目(日志和数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 `5` 秒钟或队列中的数据达到 `1000` 条时提交数据,如需了解或自定义批处理器相关参数设置,请参考 [Batch-Processor](../batch-processor.md#配置) 配置部分。
+
+## 启用插件
+
+你可以通过如下命令在指定路由上启用 `elasticsearch-logger` 插件:
+
+### 完整配置示例
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/routes/1 \
+-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "plugins":{
+        "elasticsearch-logger":{
+            "endpoint_addr":"http://127.0.0.1:9200",
+            "field":{
+                "index":"services",
+                "type":"collector"
+            },
+            "auth":{
+                "username":"elastic",
+                "password":"123456"
+            },
+            "ssl_verify":false,
+            "timeout": 60,
+            "retry_delay":1,
+            "buffer_duration":60,
+            "max_retry_count":0,
+            "batch_max_size":1000,
+            "inactive_timeout":5,
+            "name":"elasticsearch-logger"
+        }
+    },
+    "upstream":{
+        "type":"roundrobin",
+        "nodes":{
+            "127.0.0.1:1980":1
+        }
+    },
+    "uri":"/elasticsearch.do"
+}'
+```
+
+### 最小化配置示例
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/routes/1 \
+-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "plugins":{
+        "elasticsearch-logger":{
+            "endpoint_addr":"http://127.0.0.1:9200",
+            "field":{
+                "index":"services"
+            }
+        }
+    },
+    "upstream":{
+        "type":"roundrobin",
+        "nodes":{
+            "127.0.0.1:1980":1
+        }
+    },
+    "uri":"/elasticsearch.do"
+}'
+```
+
+## 测试插件
+
+向配置 `elasticsearch-logger` 插件的路由发送请求
+
+```shell
+curl -i http://127.0.0.1:9080/elasticsearch.do\?q\=hello
+HTTP/1.1 200 OK
+...
+hello, world
+```
+
+现在,你可以从 Elasticsearch 获取相关日志。
+
+```shell
+curl -X GET "http://127.0.0.1:9200/services/_search" | jq .
+{
+  "took": 0,
+   ...
+    "hits": [
+      {
+        "_index": "services",
+        "_type": "_doc",
+        "_id": "M1qAxYIBRmRqWkmH4Wya",
+        "_score": 1,
+        "_source": {
+          "apisix_latency": 0,
+          "route_id": "1",
+          "server": {
+            "version": "2.15.0",
+            "hostname": "apisix"
+          },
+          "request": {
+            "size": 102,
+            "uri": "/elasticsearch.do?q=hello",
+            "querystring": {
+              "q": "hello"
+            },
+            "headers": {
+              "user-agent": "curl/7.29.0",
+              "host": "127.0.0.1:9080",
+              "accept": "*/*"
+            },
+            "url": "http://127.0.0.1:9080/elasticsearch.do?q=hello",
+            "method": "GET"
+          },
+          "service_id": "",
+          "latency": 0,
+          "upstream": "127.0.0.1:1980",
+          "upstream_latency": 1,
+          "client_ip": "127.0.0.1",
+          "start_time": 1661170929107,
+          "response": {
+            "size": 192,
+            "headers": {
+              "date": "Mon, 22 Aug 2022 12:22:09 GMT",
+              "server": "APISIX/2.15.0",
+              "content-type": "text/plain; charset=utf-8",
+              "connection": "close",
+              "transfer-encoding": "chunked"
+            },
+            "status": 200
+          }
+        }
+      }
+    ]
+  }
+}
+```
+
+## 插件元数据设置
+
+| 名称       | 类型   | 必选项 | 默认值                                                       | 有效值 | 描述                                                         |
+| ---------- | ------ | ------ | ------------------------------------------------------------ | ------ | ------------------------------------------------------------ |
+| log_format | object | 可选   | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} |        | 以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头,则表明是要获取 [APISIX 变量](https://github.com/apache/apisix/blob/master/docs/en/latest/apisix-variable.md) 或 [Nginx 内置变量](http://nginx.org/en/docs/varindex.html)。请注意,**该设置是全局生效的**,因此在指定 log_format 后,将对所有绑定 elasticsearch-logger 的 Route 或 Service 生效。 |
+
+### 设置日志格式示例
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/elasticsearch-logger \
+-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "log_format": {
+        "host": "$host",
+        "@timestamp": "$time_iso8601",
+        "client_ip": "$remote_addr"
+    }
+}'
+```
+
+在日志收集处,将得到类似下面的日志:
+
+```json
+{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
+{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
+```
+
+向配置 `elasticsearch-logger` 插件的路由发送请求
+
+```shell
+curl -i http://127.0.0.1:9080/elasticsearch.do\?q\=hello
+HTTP/1.1 200 OK
+...
+hello, world
+```
+
+现在,你可以从 Elasticsearch 获取相关日志。
+
+```shell
+curl -X GET "http://127.0.0.1:9200/services/_search" | jq .
+{
+  "took": 0,
+  ...
+  "hits": {
+    "total": {
+      "value": 1,
+      "relation": "eq"
+    },
+    "max_score": 1,
+    "hits": [
+      {
+        "_index": "services",
+        "_type": "_doc",
+        "_id": "NVqExYIBRmRqWkmH4WwG",
+        "_score": 1,
+        "_source": {
+          "@timestamp": "2022-08-22T20:26:31+08:00",
+          "client_ip": "127.0.0.1",
+          "host": "127.0.0.1",
+          "route_id": "1"
+        }
+      }
+    ]
+  }
+}
+```
+
+### 禁用插件元数据
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/elasticsearch-logger \
+-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X DELETE
+```
+
+## 禁用插件
+
+当你需要禁用该插件时,可以通过如下命令删除相应的 JSON 配置,APISIX 将会自动重新加载相关配置,无需重启服务:
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/routes/1 \
+-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "plugins":{},
+    "upstream":{
+        "type":"roundrobin",
+        "nodes":{
+            "127.0.0.1:1980":1
+        }
+    },
+    "uri":"/elasticsearch.do"
+}'
+```
diff --git a/t/admin/plugins.t b/t/admin/plugins.t
index 70bea040a..ed9058c13 100644
--- a/t/admin/plugins.t
+++ b/t/admin/plugins.t
@@ -109,6 +109,7 @@ grpc-web
 public-api
 prometheus
 datadog
+elasticsearch-logger
 echo
 loggly
 http-logger
diff --git a/t/plugin/elasticsearch-logger.t b/t/plugin/elasticsearch-logger.t
new file mode 100644
index 000000000..2e82953f4
--- /dev/null
+++ b/t/plugin/elasticsearch-logger.t
@@ -0,0 +1,453 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+no_shuffle();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local ok, err
+            local configs = {
+                -- full configuration
+                {
+                    endpoint_addr = "http://127.0.0.1:9200",
+                    field = {
+                        index = "services",
+                        type = "collector"
+                    },
+                    auth = {
+                        username = "elastic",
+                        password = "123456"
+                    },
+                    ssl_verify = false,
+                    timeout = 60,
+                    max_retry_count = 0,
+                    retry_delay = 1,
+                    buffer_duration = 60,
+                    inactive_timeout = 2,
+                    batch_max_size = 10,
+                },
+                -- minimize configuration
+                {
+                    endpoint_addr = "http://127.0.0.1:9200",
+                    field = {
+                        index = "services"
+                    }
+                },
+                -- property "endpoint_addr" is required
+                {
+                    field = {
+                        index = "services"
+                    }
+                },
+                -- property "field" is required
+                {
+                    endpoint_addr = "http://127.0.0.1:9200",
+                },
+                -- property "index" is required
+                {
+                    endpoint_addr = "http://127.0.0.1:9200",
+                    field = {}
+                },
+                -- property "endpoint" must not end with "/"
+                {
+                    endpoint_addr = "http://127.0.0.1:9200/",
+                    field = {
+                        index = "services"
+                    }
+                }
+            }
+
+            local plugin = require("apisix.plugins.elasticsearch-logger")
+            for i = 1, #configs do
+                ok, err = plugin.check_schema(configs[i])
+                if err then
+                    ngx.say(err)
+                else
+                    ngx.say("passed")
+                end
+            end
+        }
+    }
+--- response_body_like
+passed
+passed
+property "endpoint_addr" is required
+property "field" is required
+property "field" validation failed: property "index" is required
+property "endpoint_addr" validation failed: failed to match pattern "\[\^/\]\$" with "http://127.0.0.1:9200/"
+
+
+
+=== TEST 2: set route
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/plugin_metadata/elasticsearch-logger',
+                               ngx.HTTP_DELETE)
+
+            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
+                uri = "/hello",
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                },
+                plugins = {
+                    ["elasticsearch-logger"] = {
+                        endpoint_addr = "http://127.0.0.1:9200",
+                        field = {
+                            index = "services"
+                        },
+                        batch_max_size = 1,
+                        inactive_timeout = 1
+                    }
+                }
+            })
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 3: test route (success write)
+--- extra_init_by_lua
+    local core = require("apisix.core")
+    local http = require("resty.http")
+    local ngx_re = require("ngx.re")
+    local log_util = require("apisix.utils.log-util")
+    log_util.get_full_log = function(ngx, conf)
+        return {
+            test = "test"
+        }
+    end
+
+    http.request_uri = function(self, uri, params)
+        if not params.body or type(params.body) ~= "string" then
+            return nil, "invalid params body"
+        end
+
+        local arr = ngx_re.split(params.body, "\n")
+        if not arr or #arr ~= 2 then
+            return nil, "invalid params body"
+        end
+
+        local entry = core.json.decode(arr[2])
+        local origin_entry = log_util.get_full_log(ngx, {})
+        for k, v in pairs(origin_entry) do
+            local vv = entry[k]
+            if not vv or vv ~= v then
+                return nil, "invalid params body"
+            end
+        end
+
+        core.log.error("check elasticsearch full log body success")
+        return {
+            status = 200,
+            body = "success"
+        }, nil
+    end
+--- request
+GET /hello
+--- wait: 2
+--- response_body
+hello world
+--- error_log
+check elasticsearch full log body success
+
+
+
+=== TEST 4: set route (auth)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
+                uri = "/hello",
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                },
+                plugins = {
+                    ["elasticsearch-logger"] = {
+                        endpoint_addr = "http://127.0.0.1:9201",
+                        field = {
+                            index = "services"
+                        },
+                        auth = {
+                            username = "elastic",
+                            password = "123456"
+                        },
+                        batch_max_size = 1,
+                        inactive_timeout = 1
+                    }
+                }
+            })
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 5: test route (auth success)
+--- request
+GET /hello
+--- wait: 2
+--- response_body
+hello world
+--- error_log
+Batch Processor[elasticsearch-logger] successfully processed the entries
+
+
+
+=== TEST 6: set route (no auth)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
+                uri = "/hello",
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                },
+                plugins = {
+                    ["elasticsearch-logger"] = {
+                        endpoint_addr = "http://127.0.0.1:9201",
+                        field = {
+                            index = "services"
+                        },
+                        batch_max_size = 1,
+                        inactive_timeout = 1
+                    }
+                }
+            })
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 7: test route (no auth, failed)
+--- request
+GET /hello
+--- wait: 2
+--- response_body
+hello world
+--- error_log
+Batch Processor[elasticsearch-logger] failed to process entries: elasticsearch server returned status: 401
+"reason":"missing authentication credentials for REST request [/_bulk]"
+Batch Processor[elasticsearch-logger] exceeded the max_retry_count
+
+
+
+=== TEST 8: set route (error auth)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
+                uri = "/hello",
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                },
+                plugins = {
+                    ["elasticsearch-logger"] = {
+                        endpoint_addr = "http://127.0.0.1:9201",
+                        field = {
+                            index = "services"
+                        },
+                        auth = {
+                            username = "elastic",
+                            password = "111111"
+                        },
+                        batch_max_size = 1,
+                        inactive_timeout = 1
+                    }
+                }
+            })
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 9: test route (error auth failed)
+--- request
+GET /hello
+--- wait: 2
+--- response_body
+hello world
+--- error_log
+Batch Processor[elasticsearch-logger] failed to process entries
+Batch Processor[elasticsearch-logger] exceeded the max_retry_count
+
+
+
+=== TEST 10: add plugin metadata
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/plugin_metadata/elasticsearch-logger',
+                ngx.HTTP_PUT, [[{
+                    "log_format": {
+                        "custom_host": "$host",
+                        "custom_timestamp": "$time_iso8601",
+                        "custom_client_ip": "$remote_addr"
+                    }
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+
+            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, {
+                uri = "/hello",
+                upstream = {
+                    type = "roundrobin",
+                    nodes = {
+                        ["127.0.0.1:1980"] = 1
+                    }
+                },
+                plugins = {
+                    ["elasticsearch-logger"] = {
+                        endpoint_addr = "http://127.0.0.1:9201",
+                        field = {
+                            index = "services"
+                        },
+                        batch_max_size = 1,
+                        inactive_timeout = 1
+                    }
+                }
+            })
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body_like
+passed
+passed
+
+
+
+=== TEST 11: hit route and check custom elasticsearch logger
+--- extra_init_by_lua
+    local core = require("apisix.core")
+    local http = require("resty.http")
+    local ngx_re = require("ngx.re")
+    local log_util = require("apisix.utils.log-util")
+    log_util.get_custom_format_log = function(ctx, format)
+        return {
+            test = "test"
+        }
+    end
+
+    http.request_uri = function(self, uri, params)
+        if not params.body or type(params.body) ~= "string" then
+            return nil, "invalid params body"
+        end
+
+        local arr = ngx_re.split(params.body, "\n")
+        if not arr or #arr ~= 2 then
+            return nil, "invalid params body"
+        end
+
+        local entry = core.json.decode(arr[2])
+        local origin_entry = log_util.get_custom_format_log(nil, nil)
+        for k, v in pairs(origin_entry) do
+            local vv = entry[k]
+            if not vv or vv ~= v then
+                return nil, "invalid params body"
+            end
+        end
+
+        core.log.error("check elasticsearch custom body success")
+        return {
+            status = 200,
+            body = "success"
+        }, nil
+    end
+--- request
+GET /hello
+--- response_body
+hello world
+--- wait: 2
+--- error_log
+check elasticsearch custom body success