You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by mo...@apache.org on 2023/05/29 02:01:55 UTC

[apisix] branch master updated: feat: add loki-logger plugin (#9399)

This is an automated email from the ASF dual-hosted git repository.

monkeydluffy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 66cd80f77 feat: add loki-logger plugin (#9399)
66cd80f77 is described below

commit 66cd80f77cea3631996a13702ed131d4b2a84246
Author: Zeping Bai <bz...@apache.org>
AuthorDate: Mon May 29 10:01:44 2023 +0800

    feat: add loki-logger plugin (#9399)
---
 apisix/plugins/loki-logger.lua        | 234 ++++++++++++++++++++++++++
 ci/pod/docker-compose.plugin.yml      |  10 ++
 conf/config-default.yaml              |   1 +
 docs/en/latest/config.json            |   3 +-
 docs/en/latest/plugins/loki-logger.md | 165 ++++++++++++++++++
 t/admin/plugins.t                     |   1 +
 t/lib/grafana_loki.lua                |  63 +++++++
 t/plugin/loki-logger.t                | 308 ++++++++++++++++++++++++++++++++++
 8 files changed, 784 insertions(+), 1 deletion(-)

diff --git a/apisix/plugins/loki-logger.lua b/apisix/plugins/loki-logger.lua
new file mode 100644
index 000000000..593fc8b18
--- /dev/null
+++ b/apisix/plugins/loki-logger.lua
@@ -0,0 +1,234 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local bp_manager_mod  = require("apisix.utils.batch-processor-manager")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local new_tab         = require("table.new")
+
+local pairs        = pairs
+local ipairs       = ipairs
+local tostring     = tostring
+local math_random  = math.random
+local table_insert = table.insert
+local ngx          = ngx
+local str_format   = core.string.format
+
+local plugin_name = "loki-logger"
+local batch_processor_manager = bp_manager_mod.new("loki logger")
+
+local schema = {
+    type = "object",
+    properties = {
+        -- core configurations
+        endpoint_addrs = {
+            type = "array",
+            minItems = 1,
+            items = core.schema.uri_def,
+        },
+        endpoint_uri = {
+            type = "string",
+            minLength = 1,
+            default = "/loki/api/v1/push"
+        },
+        tenant_id = {type = "string", default = "fake"},
+        log_labels = {
+            type = "object",
+            patternProperties = {
+                [".*"] = {
+                    type = "string",
+                    minLength = 1,
+                },
+            },
+            default = {
+                job = "apisix",
+            },
+        },
+
+        -- connection layer configurations
+        ssl_verify = {type = "boolean", default = false},
+        timeout = {
+            type = "integer",
+            minimum = 1,
+            maximum = 60000,
+            default = 3000,
+            description = "timeout in milliseconds",
+        },
+        keepalive = {type = "boolean", default = true},
+        keepalive_timeout = {
+            type = "integer",
+            minimum = 1000,
+            default = 60000,
+            description = "keepalive timeout in milliseconds",
+        },
+        keepalive_pool = {type = "integer", minimum = 1, default = 5},
+
+        -- logger related configurations
+        log_format = {type = "object"},
+        include_req_body = {type = "boolean", default = false},
+        include_req_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array"
+            }
+        },
+        include_resp_body = {type = "boolean", default = false},
+        include_resp_body_expr = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "array"
+            }
+        },
+    },
+    required = {"endpoint_addrs"}
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 414,
+    name = plugin_name,
+    schema = batch_processor_manager:wrap_schema(schema),
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+
+    local ok, err = core.schema.check(schema, conf)
+    if not ok then
+        return nil, err
+    end
+    return log_util.check_log_schema(conf)
+end
+
+
+local function send_http_data(conf, log)
+    local params = {
+        headers = {
+            ["Content-Type"] = "application/json",
+            ["X-Scope-OrgID"] = conf.tenant_id,
+        },
+        keepalive = conf.keepalive,
+        ssl_verify = conf.ssl_verify,
+        method = "POST",
+        body = core.json.encode(log)
+    }
+
+    if conf.keepalive then
+        params.keepalive_timeout = conf.keepalive_timeout
+        params.keepalive_pool = conf.keepalive_pool
+    end
+
+    local httpc, err = http.new()
+    if not httpc then
+        return false, str_format("create http client error: %s", err)
+    end
+    httpc:set_timeout(conf.timeout)
+
+    -- select an random endpoint and build URL
+    local endpoint_url = conf.endpoint_addrs[math_random(#conf.endpoint_addrs)] .. conf.endpoint_uri
+    local res, err = httpc:request_uri(endpoint_url, params)
+    if not res then
+        return false, err
+    end
+
+    if res.status >= 300 then
+        return false, str_format("loki server returned status: %d, body: %s",
+            res.status, res.body or "")
+    end
+
+    return true
+end
+
+
+function _M.body_filter(conf, ctx)
+    log_util.collect_body(conf, ctx)
+end
+
+
+function _M.log(conf, ctx)
+    local entry = log_util.get_log_entry(plugin_name, conf, ctx)
+
+    if not entry.route_id then
+        entry.route_id = "no-matched"
+    end
+
+    -- insert start time as log time, multiply to nanoseconds
+    -- use string concat to circumvent 64bit integers that LuaVM cannot handle
+    -- that is, first process the decimal part of the millisecond value
+    -- and then add 6 zeros by string concatenation
+    entry.loki_log_time = tostring(ngx.req.start_time() * 1000) .. "000000"
+
+    if batch_processor_manager:add_entry(conf, entry) then
+        return
+    end
+
+    -- generate a function to be executed by the batch processor
+    local func = function(entries)
+        local labels = conf.log_labels
+
+        -- parsing possible variables in label value
+        for key, value in pairs(labels) do
+            local new_val, err, n_resolved = core.utils.resolve_var(value, ctx.var)
+            if not err and n_resolved > 0 then
+                labels[key] = new_val
+            end
+        end
+
+        -- build loki request data
+        local data = {
+            streams = {
+                {
+                    stream = labels,
+                    values = new_tab(1, 0),
+                }
+            }
+        }
+
+        -- add all entries to the batch
+        for _, entry in ipairs(entries) do
+            local log_time = entry.loki_log_time
+            entry.loki_log_time = nil -- clean logger internal field
+
+            table_insert(data.streams[1].values, {
+                log_time, core.json.encode(entry)
+            })
+        end
+
+        return send_http_data(conf, data)
+    end
+
+    batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+end
+
+
+return _M
diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml
index 606929125..58e305c8f 100644
--- a/ci/pod/docker-compose.plugin.yml
+++ b/ci/pod/docker-compose.plugin.yml
@@ -143,6 +143,15 @@ services:
       - ./t/certs:/certs
 
 
+  ## Grafana Loki
+  loki:
+    image: grafana/loki:2.8.0
+    command: -config.file=/etc/loki/local-config.yaml -auth.enabled -querier.multi-tenant-queries-enabled
+    ports:
+      - "3100:3100"
+    networks:
+      - loki_net
+
   rocketmq_namesrv:
     image: apacherocketmq/rocketmq:4.6.0
     container_name: rmqnamesrv
@@ -351,3 +360,4 @@ networks:
   opa_net:
   vector_net:
   clickhouse_net:
+  loki_net:
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 4f97adc4e..d41df397b 100755
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -467,6 +467,7 @@ plugins:                          # plugin list (sorted by priority)
   - public-api                     # priority: 501
   - prometheus                     # priority: 500
   - datadog                        # priority: 495
+  - loki-logger                    # priority: 414
   - elasticsearch-logger           # priority: 413
   - echo                           # priority: 412
   - loggly                         # priority: 411
diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json
index d752359a6..496a44ff9 100644
--- a/docs/en/latest/config.json
+++ b/docs/en/latest/config.json
@@ -181,7 +181,8 @@
                 "plugins/file-logger",
                 "plugins/loggly",
                 "plugins/elasticsearch-logger",
-                "plugins/tencent-cloud-cls"
+                "plugins/tencent-cloud-cls",
+                "plugins/loki-logger"
               ]
             }
           ]
diff --git a/docs/en/latest/plugins/loki-logger.md b/docs/en/latest/plugins/loki-logger.md
new file mode 100644
index 000000000..6bd3ae8d6
--- /dev/null
+++ b/docs/en/latest/plugins/loki-logger.md
@@ -0,0 +1,165 @@
+---
+title: loki-logger
+keywords:
+  - Apache APISIX
+  - API Gateway
+  - Plugin
+  - Loki-logger
+  - Grafana Loki
+description: This document contains information about the Apache APISIX loki-logger Plugin.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Description
+
+The `loki-logger` plugin is used to forward logs to [Grafana Loki](https://grafana.com/oss/loki/) for analysis and storage.
+
+When the Plugin is enabled, APISIX will serialize the request context information to [Log entries in JSON](https://grafana.com/docs/loki/latest/api/#push-log-entries-to-loki) and submit it to the batch queue. When the maximum batch size is exceeded, the data in the queue is pushed to Grafana Loki. See [batch processor](../batch-processor.md) for more details.
+
+## Attributes
+
+| Name | Type | Required | Default | Description |
+|---|---|---|---|---|
+| endpoint_addrs | array[string] | True |  | Loki API base URL, format like http://127.0.0.1:3100, supports HTTPS and domain names. If multiple endpoints are configured, they will be written randomly. |
+| endpoint_uri | string | False | /loki/api/v1/push | If you are using a log collection service that is compatible with the Loki Push API, you can use this configuration item to customize the API path. |
+| tenant_id | string | False | fake | Loki tenant ID. According to Loki's [multi-tenancy documentation](https://grafana.com/docs/loki/latest/operations/multi-tenancy/#multi-tenancy), its default value is set to the default value `fake` under single-tenancy. |
+| log_labels | object | False | {job = "apisix"} | Loki log label. [APISIX variables](../apisix-variable.md) and [Nginx variables](http://nginx.org/en/docs/varindex.html) can be used by prefixing the string with `$`, both individual and combined, such as `$host` or `$remote_addr:$remote_port`. |
+| ssl_verify        | boolean       | False    | true | When set to `true`, verifies the SSL certificate. |
+| timeout           | integer       | False    | 3000ms  | [1, 60000]ms   | Timeout for the authorization service HTTP call. |
+| keepalive         | boolean       | False    | true | When set to `true`, keeps the connection alive for multiple requests. |
+| keepalive_timeout | integer       | False    | 60000ms | [1000, ...]ms  | Idle time after which the connection is closed. |
+| keepalive_pool    | integer       | False    | 5       | [1, ...]ms     | Connection pool limit. |
+| log_format | object | False    |          | Log format declared as key value pairs in JSON format. Values only support strings. [APISIX variables](../apisix-variable.md) and [Nginx variables](http://nginx.org/en/docs/varindex.html) can be used by prefixing the string with `$`. |
+| include_req_body       | boolean | False    | false | When set to `true` includes the request body in the log. If the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitations. |
+| include_req_body_expr  | array   | False    |  | Filter for when the `include_req_body` attribute is set to `true`. Request body is only logged when the expression set here evaluates to `true`. See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more. |
+| include_resp_body      | boolean | False    | false | When set to `true` includes the response body in the log. |
+| include_resp_body_expr | array   | False    |  | Filter for when the `include_resp_body` attribute is set to `true`. Response body is only logged when the expression set here evaluates to `true`. See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more. |
+
+This plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration.
+
+## Metadata
+
+You can also set the format of the logs by configuring the Plugin metadata. The following configurations are available:
+
+| Name | Type | Required | Default | Description |
+|------|------|----------|---------|-------------|
+| log_format | object | False | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | Log format declared as key value pairs in JSON format. Values only support strings. [APISIX variables](../apisix-variable.md) and [Nginx variables](http://nginx.org/en/docs/varindex.html) can be used by prefixing the string with `$`. |
+
+:::info IMPORTANT
+
+Configuring the plugin metadata is global in scope. This means that it will take effect on all Routes and Services which use the `loki-logger` plugin.
+
+:::
+
+The example below shows how you can configure through the Admin API:
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/loki-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "log_format": {
+        "host": "$host",
+        "@timestamp": "$time_iso8601",
+        "client_ip": "$remote_addr"
+    }
+}'
+```
+
+With this configuration, your logs would be formatted as shown below:
+
+```shell
+{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
+{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"}
+```
+
+## Enabling the plugin
+
+The example below shows how you can enable the `loki-logger` plugin on a specific Route:
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "plugins": {
+        "loki-logger": {
+            "endpoint_addrs" : ["http://127.0.0.1:3100"]
+        }
+    },
+    "upstream": {
+       "nodes": {
+           "127.0.0.1:1980": 1
+       },
+       "type": "roundrobin"
+    },
+    "uri": "/hello"
+}'
+```
+
+## Example usage
+
+Now, if you make a request to APISIX, it will be logged in your Loki server:
+
+```shell
+curl -i http://127.0.0.1:9080/hello
+```
+
+## Delete the plugin
+
+When you need to remove the `loki-logger` plugin, you can delete the corresponding JSON configuration with the following command and APISIX will automatically reload the relevant configuration without restarting the service:
+
+```shell
+curl http://127.0.0.1:9180/apisix/admin/routes/1  -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+    "methods": ["GET"],
+    "uri": "/hello",
+    "plugins": {},
+    "upstream": {
+        "type": "roundrobin",
+        "nodes": {
+            "127.0.0.1:1980": 1
+        }
+    }
+}'
+```
+
+## FAQ
+
+### Logs are not pushed properly
+
+Look at `error.log` for such a log.
+
+```text
+2023/04/30 13:45:46 [error] 19381#19381: *1075673 [lua] batch-processor.lua:95: Batch Processor[loki logger] failed to process entries: loki server returned status: 401, body: no org id, context: ngx.timer, client: 127.0.0.1, server: 0.0.0.0:9081
+```
+
+The error can be diagnosed based on the error code in the `failed to process entries: loki server returned status: 401, body: no org id` and the response body of the loki server.
+
+### Getting errors when QPS is high?
+
+- Make sure to `keepalive` related configuration is set properly. See [Attributes](#attributes) for more information.
+- Check the logs in `error.log`, look for such a log.
+
+    ```text
+    2023/04/30 13:49:34 [error] 19381#19381: *1082680 [lua] batch-processor.lua:95: Batch Processor[loki logger] failed to process entries: loki server returned status: 429, body: Ingestion rate limit exceeded for user tenant_1 (limit: 4194304 bytes/sec) while attempting to ingest '1000' lines totaling '616307' bytes, reduce log volume or contact your Loki administrator to see if the limit can be increased, context: ngx.timer, client: 127.0.0.1, server: 0.0.0.0:9081
+    ```
+
+  - The logs usually associated with high QPS look like the above. The error is: `Ingestion rate limit exceeded for user tenant_1 (limit: 4194304 bytes/sec) while attempting to ingest '1000' lines totaling '616307' bytes, reduce log volume or contact your Loki administrator to see if the limit can be increased`.
+  - Refer to [Loki documentation](https://grafana.com/docs/loki/latest/configuration/#limits_config) to add limits on the amount of default and burst logs, such as `ingestion_rate_mb` and `ingestion_burst_size_mb`.
+
+    As the test during development, setting the `ingestion_burst_size_mb` to 100 allows APISIX to push the logs correctly at least at 10000 RPS.
diff --git a/t/admin/plugins.t b/t/admin/plugins.t
index ceb4df15b..ae7617dfd 100644
--- a/t/admin/plugins.t
+++ b/t/admin/plugins.t
@@ -109,6 +109,7 @@ grpc-web
 public-api
 prometheus
 datadog
+loki-logger
 elasticsearch-logger
 echo
 loggly
diff --git a/t/lib/grafana_loki.lua b/t/lib/grafana_loki.lua
new file mode 100644
index 000000000..fc1173982
--- /dev/null
+++ b/t/lib/grafana_loki.lua
@@ -0,0 +1,63 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local cjson = require("cjson")
+local http = require("resty.http")
+
+local _M = {}
+
+
+function _M.fetch_logs_from_loki(from, to, options)
+    options = options or {}
+
+    local direction = options.direction or "backward"
+    local limit = options.limit or "10"
+    local query = options.query or [[{job="apisix"} | json]]
+    local url = options.url or "http://127.0.0.1:3100/loki/api/v1/query_range"
+    local headers = options.headers or {
+        ["X-Scope-OrgID"] = "tenant_1"
+    }
+
+    local httpc = http.new()
+    local res, err = httpc:request_uri(url, {
+        query = {
+            start = from,
+            ["end"] = to,
+            direction = direction,
+            limit = limit,
+            query = query,
+        },
+        headers = headers
+    })
+
+    if not res or err then
+        return nil, err
+    end
+
+    if res.status > 300 then
+        return nil, "HTTP status code: " .. res.status .. ", body: " .. res.body
+    end
+
+    local data = cjson.decode(res.body)
+    if not data then
+        return nil, "failed to decode response body: " .. res.body
+    end
+    return data, nil
+end
+
+
+return _M
diff --git a/t/plugin/loki-logger.t b/t/plugin/loki-logger.t
new file mode 100644
index 000000000..faa8749a9
--- /dev/null
+++ b/t/plugin/loki-logger.t
@@ -0,0 +1,308 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local test_cases = {
+                {endpoint_addrs = {"http://127.0.0.1:8199"}},
+                {endpoint_addrs = "http://127.0.0.1:8199"},
+                {endpoint_addrs = {}},
+                {},
+                {endpoint_addrs = {"http://127.0.0.1:8199"}, endpoint_uri = "/loki/api/v1/push"},
+                {endpoint_addrs = {"http://127.0.0.1:8199"}, endpoint_uri = 1234},
+                {endpoint_addrs = {"http://127.0.0.1:8199"}, tenant_id = 1234},
+                {endpoint_addrs = {"http://127.0.0.1:8199"}, log_labels = "1234"},
+                {endpoint_addrs = {"http://127.0.0.1:8199"}, log_labels = {job = "apisix6"}},
+            }
+            local plugin = require("apisix.plugins.loki-logger")
+
+            for _, case in ipairs(test_cases) do
+                local ok, err = plugin.check_schema(case)
+                ngx.say(ok and "done" or err)
+            end
+        }
+    }
+--- response_body
+done
+property "endpoint_addrs" validation failed: wrong type: expected array, got string
+property "endpoint_addrs" validation failed: expect array to have at least 1 items
+property "endpoint_addrs" is required
+done
+property "endpoint_uri" validation failed: wrong type: expected string, got number
+property "tenant_id" validation failed: wrong type: expected string, got number
+property "log_labels" validation failed: wrong type: expected object, got string
+done
+
+
+
+=== TEST 2: setup route
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "loki-logger": {
+                            "endpoint_addrs": ["http://127.0.0.1:3100"],
+                            "tenant_id": "tenant_1",
+                            "batch_max_size": 1
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 3: hit route
+--- request
+GET /hello
+--- more_headers
+test-header: only-for-test#1
+--- response_body
+hello world
+
+
+
+=== TEST 4: check loki log
+--- config
+    location /t {
+        content_by_lua_block {
+            local cjson = require("cjson")
+            local now = ngx.now() * 1000
+            local data, err = require("lib.grafana_loki").fetch_logs_from_loki(
+                tostring(now - 3000) .. "000000", -- from
+                tostring(now) .. "000000"         -- to
+            )
+
+            assert(err == nil, "fetch logs error: " .. (err or ""))
+            assert(data.status == "success", "loki response error: " .. cjson.encode(data))
+            assert(#data.data.result > 0, "loki log empty: " .. cjson.encode(data))
+
+            local entry = data.data.result[1]
+            assert(entry.stream.request_headers_test_header == "only-for-test#1",
+                  "expected field request_headers_test_header value: " .. cjson.encode(entry))
+        }
+    }
+--- error_code: 200
+
+
+
+=== TEST 5: setup route (with log_labels)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "loki-logger": {
+                            "endpoint_addrs": ["http://127.0.0.1:3100"],
+                            "tenant_id": "tenant_1",
+                            "log_labels": {
+                                "custom_label": "custom_label_value"
+                            },
+                            "batch_max_size": 1
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 6: hit route
+--- request
+GET /hello
+--- more_headers
+test-header: only-for-test#2
+--- response_body
+hello world
+
+
+
+=== TEST 7: check loki log (with custom_label)
+--- config
+    location /t {
+        content_by_lua_block {
+            local cjson = require("cjson")
+            local now = ngx.now() * 1000
+            local data, err = require("lib.grafana_loki").fetch_logs_from_loki(
+                tostring(now - 3000) .. "000000", -- from
+                tostring(now) .. "000000",        -- to
+                { query = [[{custom_label="custom_label_value"} | json]] }
+            )
+
+            assert(err == nil, "fetch logs error: " .. (err or ""))
+            assert(data.status == "success", "loki response error: " .. cjson.encode(data))
+            assert(#data.data.result > 0, "loki log empty: " .. cjson.encode(data))
+
+            local entry = data.data.result[1]
+            assert(entry.stream.request_headers_test_header == "only-for-test#2",
+                  "expected field request_headers_test_header value: " .. cjson.encode(entry))
+        }
+    }
+--- error_code: 200
+
+
+
+=== TEST 8: setup route (with tenant_id)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "plugins": {
+                        "loki-logger": {
+                            "endpoint_addrs": ["http://127.0.0.1:3100"],
+                            "tenant_id": "tenant_2",
+                            "batch_max_size": 1
+                        }
+                    },
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 9: hit route
+--- request
+GET /hello
+--- more_headers
+test-header: only-for-test#3
+--- response_body
+hello world
+
+
+
+=== TEST 10: check loki log (with tenant_id tenant_1)
+--- config
+    location /t {
+        content_by_lua_block {
+            local cjson = require("cjson")
+            local now = ngx.now() * 1000
+            local data, err = require("lib.grafana_loki").fetch_logs_from_loki(
+                tostring(now - 10000) .. "000000", -- from
+                tostring(now) .. "000000"          -- to
+            )
+
+            assert(err == nil, "fetch logs error: " .. (err or ""))
+            assert(data.status == "success", "loki response error: " .. cjson.encode(data))
+            assert(#data.data.result > 0, "loki log empty: " .. cjson.encode(data))
+
+            local entry = data.data.result[1]
+            assert(entry.stream.request_headers_test_header ~= "only-for-test#3",
+                  "expected field request_headers_test_header value: " .. cjson.encode(entry))
+        }
+    }
+--- error_code: 200
+
+
+
+=== TEST 11: check loki log (with tenant_id tenant_2)
+--- config
+    location /t {
+        content_by_lua_block {
+            local cjson = require("cjson")
+            local now = ngx.now() * 1000
+            local data, err = require("lib.grafana_loki").fetch_logs_from_loki(
+                tostring(now - 3000) .. "000000", -- from
+                tostring(now) .. "000000",        -- to
+                { headers = {
+                        ["X-Scope-OrgID"] = "tenant_2"
+                } }
+            )
+
+            assert(err == nil, "fetch logs error: " .. (err or ""))
+            assert(data.status == "success", "loki response error: " .. cjson.encode(data))
+            assert(#data.data.result > 0, "loki log empty: " .. cjson.encode(data))
+
+            local entry = data.data.result[1]
+            assert(entry.stream.request_headers_test_header == "only-for-test#3",
+                  "expected field request_headers_test_header value: " .. cjson.encode(entry))
+        }
+    }
+--- error_code: 200