You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2021/11/10 15:52:57 UTC

[GitHub] [apisix] dmsolr opened a new pull request #5478: feat: provide skywalking logger plugin

dmsolr opened a new pull request #5478:
URL: https://github.com/apache/apisix/pull/5478


   ### What this PR does / why we need it:
   <!--- Why is this change required? What problem does it solve? -->
   <!--- If it fixes an open issue, please link to the issue here. -->
   
   ### Pre-submission checklist:
   
   <!--
   Please follow the requirements:
   1. Use Draft if the PR is not ready to be reviewed
   2. Test is required for the feat/fix PR, unless you have a good reason
   3. Doc is required for the feat PR
   4. Use a new commit to resolve review instead of `push -f`
   5. Use "request review" to notify the reviewer once you have resolved the review
   -->
   
   * [x] Did you explain what problem does this PR solve? Or what new features have been added?
   * [x] Have you added corresponding test cases?
   * [x] Have you modified the corresponding document?
   * [x] Is this PR backward compatible? **If it is not backward compatible, please discuss on the [mailing list](https://github.com/apache/apisix/tree/master#community) first**
   
   provide a new logger plugin to support log access log to SkyWalking in SkyWalking Log Format. If skywalking trace plugin is enabled or requests with SkyWalking Trace context header, this plugin will parse the trace context header and print trace context in log. So that the access log can relate to trace.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747584069



##########
File path: docs/en/latest/plugins/skywalking-logger.md
##########
@@ -0,0 +1,117 @@
+---
+title: http-logger

Review comment:
       fixed

##########
File path: t/plugin/skywalking-logger.t
##########
@@ -0,0 +1,285 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $http_config = $block->http_config // <<_EOC_;
+
+    server {
+        listen 1986;
+        server_tokens off;
+
+        location /v3/logs {
+            content_by_lua_block {
+                local core = require("apisix.core")
+
+                core.log.warn(core.json.encode(core.request.get_body(), true))
+            }
+        }
+    }
+_EOC_
+
+    $block->set_value("http_config", $http_config);

Review comment:
       updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747295479



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])
+        }
+    else
+        trace_context = {}
+    end
+
+    local entry = {
+        traceContext = trace_context,
+        body = {
+            json = {
+                json = core.json.encode(log_body, true)
+            }
+        },
+        service = conf.service_name,
+        serviceInstance = conf.service_instance_name,
+        endpoint = ngx.var.uri,
+    }
+
+    if not stale_timer_running then
+        -- run the timer every 30 mins if any log is present

Review comment:
       Make sense to me. @spacewander how do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747183619



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])

Review comment:
       I update the comment. Currently, the 4th part, `3` means span id.
   ```
   4. Parent span ID. Must be an integer. It begins with 0. This span ID points to the parent span in parent trace segment.
   ```
   Ref [SkyWalking Cross Process Propagation Headers Protocol](https://github.com/apache/skywalking/blob/master/docs/en/protocols/Skywalking-Cross-Process-Propagation-Headers-Protocol-v3.md)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] wu-sheng commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747928385



##########
File path: docs/en/latest/plugins/skywalking-logger.md
##########
@@ -0,0 +1,117 @@
+---
+title: skywalking-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+- [**Name**](#name)
+- [**Attributes**](#attributes)
+- [**How To Enable**](#how-to-enable)
+- [**Test Plugin**](#test-plugin)
+- [**Metadata**](#metadata)
+- [**Disable Plugin**](#disable-plugin)
+
+## Name
+
+`skywalking-logger` is a plugin which push Access Log data to `SkyWalking OAP` server over HTTP.

Review comment:
       ```suggestion
   `skywalking-logger` is a plugin which push Access Log data to `SkyWalking OAP` server over HTTP. If there is tracing context existing, it sets up the trace-log correlation automatically, and relies on [SkyWalking Cross Process Propagation Headers Protocol](https://skywalking.apache.org/docs/main/latest/en/protocols/skywalking-cross-process-propagation-headers-protocol-v3/).
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] wu-sheng commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747346243



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])
+        }
+    else
+        trace_context = {}
+    end
+
+    local entry = {
+        traceContext = trace_context,
+        body = {
+            json = {
+                json = core.json.encode(log_body, true)
+            }
+        },
+        service = conf.service_name,
+        serviceInstance = conf.service_instance_name,
+        endpoint = ngx.var.uri,
+    }
+
+    if not stale_timer_running then
+        -- run the timer every 30 mins if any log is present

Review comment:
       If it is about `clean up the stale buffers` only, than, this is APISIX's call. Nothing relates to SkyWalking or observability preference.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] wu-sheng commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747270929



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])
+        }
+    else
+        trace_context = {}
+    end
+
+    local entry = {
+        traceContext = trace_context,
+        body = {
+            json = {
+                json = core.json.encode(log_body, true)
+            }
+        },
+        service = conf.service_name,
+        serviceInstance = conf.service_instance_name,
+        endpoint = ngx.var.uri,
+    }
+
+    if not stale_timer_running then
+        -- run the timer every 30 mins if any log is present

Review comment:
       Every 30 mins? For?

##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])
+        }
+    else
+        trace_context = {}
+    end
+
+    local entry = {
+        traceContext = trace_context,
+        body = {
+            json = {
+                json = core.json.encode(log_body, true)
+            }
+        },
+        service = conf.service_name,
+        serviceInstance = conf.service_instance_name,
+        endpoint = ngx.var.uri,
+    }
+
+    if not stale_timer_running then
+        -- run the timer every 30 mins if any log is present

Review comment:
       I think this is a huge latency in low traffic scenario. No one is willing to wait for 30 mins for log collecting.

##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')

Review comment:
       And all following decode(twice for trace id and span id) and splitting cost unnecessary resources.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander merged pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
spacewander merged pull request #5478:
URL: https://github.com/apache/apisix/pull/5478


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] wu-sheng commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r748113899



##########
File path: t/plugin/skywalking-logger.t
##########
@@ -0,0 +1,269 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $http_config = $block->http_config // <<_EOC_;
+
+    server {
+        listen 1986;
+        server_tokens off;
+
+        location /v3/logs {
+            content_by_lua_block {
+                local core = require("apisix.core")
+
+                core.log.warn(core.json.encode(core.request.get_body(), true))
+            }
+        }
+    }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.skywalking-logger")
+            local ok, err = plugin.check_schema({endpoint_addr = "http://127.0.0.1"})
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+
+
+
+=== TEST 2: full schema check
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.skywalking-logger")
+            local ok, err = plugin.check_schema({endpoint_addr = "http://127.0.0.1",
+                                                 timeout = 3,
+                                                 name = "skywalking-logger",
+                                                 max_retry_count = 2,
+                                                 retry_delay = 2,
+                                                 buffer_duration = 2,
+                                                 inactive_timeout = 2,
+                                                 batch_max_size = 500,
+                                                 })
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+
+
+
+=== TEST 3: uri is missing
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.skywalking-logger")
+            local ok, err = plugin.check_schema({timeout = 3,
+                                                 name = "skywalking-logger",
+                                                 max_retry_count = 2,
+                                                 retry_delay = 2,
+                                                 buffer_duration = 2,
+                                                 inactive_timeout = 2,
+                                                 batch_max_size = 500,
+                                                 })
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+--- response_body
+property "endpoint_addr" is required
+done
+
+
+
+=== TEST 4: add plugin
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "skywalking-logger": {
+                                "endpoint_addr": "http://127.0.0.1:1986",
+                                "batch_max_size": 1,
+                                "max_retry_count": 1,
+                                "retry_delay": 2,
+                                "buffer_duration": 2,
+                                "inactive_timeout": 2
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1982": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/opentracing"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                "skywalking-logger": {
+                                    "endpoint_addr": "http://127.0.0.1:1986",
+                                    "batch_max_size": 1,
+                                    "max_retry_count": 1,
+                                    "retry_delay": 2,
+                                    "buffer_duration": 2,
+                                    "inactive_timeout": 2
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1982": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/opentracing"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 5: access local server
+--- request
+GET /opentracing
+--- response_body
+opentracing
+--- error_log
+Batch Processor[skywalking logger] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 6: test trace context header
+--- request
+GET /opentracing
+--- more_headers
+sw8: 1-YWU3MDk3NjktNmUyMC00YzY4LTk3MzMtMTBmNDU1MjE2Y2M1-YWU3MDk3NjktNmUyMC00YzY4LTk3MzMtMTBmNDU1MjE2Y2M1-1-QVBJU0lY-QVBJU0lYIEluc3RhbmNlIE5hbWU=-L2dldA==-dXBzdHJlYW0gc2VydmljZQ==
+--- response_body
+opentracing
+--- error_log eval
+qr/.*\\\"traceContext\\\":\{(\\\"traceSegmentId\\\":\\\"ae709769-6e20-4c68-9733-10f455216cc5\\\"|\\\"traceId\\\":\\\"ae709769-6e20-4c68-9733-10f455216cc5\\\"|\\\"spanId\\\":1|,){5}\}.*/
+--- wait: 0.5
+
+
+
+=== TEST 7: test wrong trace context header
+--- request
+GET /opentracing

Review comment:
       Ah, I suddenly noticed this, why `/opentracing`? Rather than `/request`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747926130



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,242 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    local trace_context
+    local sw_header = ngx.req.get_headers()["sw8"]
+    if sw_header then
+        -- 1-TRACEID-SEGMENTID-SPANID-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(sw_header, '-')
+        if #ids == 8 then
+            local trace_id, err = base64.decode_base64url(ids[2])

Review comment:
       trace_context is optional. If trace_id is missing, it cannot relate to trace. But missing trace_id log can be found in `log` view.
   
   Refer https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto#L99-L107




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747212507



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])

Review comment:
       got it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747347136



##########
File path: t/plugin/skywalking-logger.t
##########
@@ -0,0 +1,285 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $http_config = $block->http_config // <<_EOC_;
+
+    server {
+        listen 1986;
+        server_tokens off;
+
+        location /v3/logs {
+            content_by_lua_block {
+                local core = require("apisix.core")
+
+                core.log.warn(core.json.encode(core.request.get_body(), true))
+            }
+        }
+    }
+_EOC_
+
+    $block->set_value("http_config", $http_config);

Review comment:
       New tests should contain a common section, like https://github.com/apache/apisix/blob/f06f6cc0e8e41875bf105cf2c18457339002df53/t/plugin/limit-count2.t#L38
   
   There is no need to repeat them.

##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,242 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    local trace_context
+    local sw_header = ngx.req.get_headers()["sw8"]
+    if sw_header then
+        -- 1-TRACEID-SEGMENTID-SPANID-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(sw_header, '-')
+        if #ids == 8 then
+            local trace_id, err = base64.decode_base64url(ids[2])

Review comment:
       `decode_base64url` only returns str or nil




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747342726



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])
+        }
+    else
+        trace_context = {}
+    end
+
+    local entry = {
+        traceContext = trace_context,
+        body = {
+            json = {
+                json = core.json.encode(log_body, true)
+            }
+        },
+        service = conf.service_name,
+        serviceInstance = conf.service_instance_name,
+        endpoint = ngx.var.uri,
+    }
+
+    if not stale_timer_running then
+        -- run the timer every 30 mins if any log is present

Review comment:
       Err. @dmsolr's explain is incorrect. This timer doesn't clean up the stale log. It clean up the stale buffers when they are no longer holding the log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] wu-sheng commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747278452



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')

Review comment:
       Your call, SkyWalking wouldn't change v3 format for a long time(already years, expect more) AFAIK. `namespace` would be an issue, but not very serious, just duplicate configurations.
   I just think resource costing is very sensitive. And this is not that risk, as when the incompatible issue happens, it happens at the first time both plugin activated. So anyone will notice this immediately, they could do fix or disable one/both quickly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] membphis commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
membphis commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r748105270



##########
File path: t/plugin/skywalking-logger.t
##########
@@ -0,0 +1,269 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $http_config = $block->http_config // <<_EOC_;
+
+    server {
+        listen 1986;
+        server_tokens off;
+
+        location /v3/logs {
+            content_by_lua_block {
+                local core = require("apisix.core")
+
+                core.log.warn(core.json.encode(core.request.get_body(), true))
+            }
+        }
+    }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.skywalking-logger")
+            local ok, err = plugin.check_schema({endpoint_addr = "http://127.0.0.1"})
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+
+
+
+=== TEST 2: full schema check
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.skywalking-logger")
+            local ok, err = plugin.check_schema({endpoint_addr = "http://127.0.0.1",
+                                                 timeout = 3,
+                                                 name = "skywalking-logger",
+                                                 max_retry_count = 2,
+                                                 retry_delay = 2,
+                                                 buffer_duration = 2,
+                                                 inactive_timeout = 2,
+                                                 batch_max_size = 500,
+                                                 })
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+
+
+
+=== TEST 3: uri is missing
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.skywalking-logger")
+            local ok, err = plugin.check_schema({timeout = 3,
+                                                 name = "skywalking-logger",
+                                                 max_retry_count = 2,
+                                                 retry_delay = 2,
+                                                 buffer_duration = 2,
+                                                 inactive_timeout = 2,
+                                                 batch_max_size = 500,
+                                                 })
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+--- response_body
+property "endpoint_addr" is required
+done
+
+
+
+=== TEST 4: add plugin
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "skywalking-logger": {
+                                "endpoint_addr": "http://127.0.0.1:1986",
+                                "batch_max_size": 1,
+                                "max_retry_count": 1,
+                                "retry_delay": 2,
+                                "buffer_duration": 2,
+                                "inactive_timeout": 2
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1982": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/opentracing"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                "skywalking-logger": {
+                                    "endpoint_addr": "http://127.0.0.1:1986",
+                                    "batch_max_size": 1,
+                                    "max_retry_count": 1,
+                                    "retry_delay": 2,
+                                    "buffer_duration": 2,
+                                    "inactive_timeout": 2
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1982": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/opentracing"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 5: access local server
+--- request
+GET /opentracing
+--- response_body
+opentracing
+--- error_log
+Batch Processor[skywalking logger] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 6: test trace context header
+--- request
+GET /opentracing
+--- more_headers
+sw8: 1-YWU3MDk3NjktNmUyMC00YzY4LTk3MzMtMTBmNDU1MjE2Y2M1-YWU3MDk3NjktNmUyMC00YzY4LTk3MzMtMTBmNDU1MjE2Y2M1-1-QVBJU0lY-QVBJU0lYIEluc3RhbmNlIE5hbWU=-L2dldA==-dXBzdHJlYW0gc2VydmljZQ==
+--- response_body
+opentracing
+--- error_log eval
+qr/.*\\\"traceContext\\\":\{(\\\"traceSegmentId\\\":\\\"ae709769-6e20-4c68-9733-10f455216cc5\\\"|\\\"traceId\\\":\\\"ae709769-6e20-4c68-9733-10f455216cc5\\\"|\\\"spanId\\\":1|,){5}\}.*/
+--- wait: 0.5
+
+
+
+=== TEST 7: test wrong trace context header
+--- request
+GET /opentracing
+--- more_headers
+sw8: 1-YWU3MDk3NjktNmUyMC00YzY4LTk3MzMtMTBmNDU1MjE2Y2M1-YWU3MDk3NjktNmUyMC00YzY4LTk3MzMtMTBmNDU1MjE2Y2M1-1-QVBJU0lY-QVBJU0lYIEluc3RhbmNlIE5hbWU=-L2dldA==
+--- response_body
+opentracing
+--- error_log eval
+qr/failed to parse trace_context header:/
+--- wait: 0.5
+
+
+
+=== TEST 8: add plugin metadata
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/plugin_metadata/skywalking-logger',
+                ngx.HTTP_PUT,
+                [[{
+                    "log_format": {
+                        "host": "$host",
+                        "@timestamp": "$time_iso8601",
+                        "client_ip": "$remote_addr"
+                    }
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "log_format": {
+                                "host": "$host",
+                                "@timestamp": "$time_iso8601",
+                                "client_ip": "$remote_addr"
+                            }
+                        }
+                    },
+                    "action": "set"
+                }]]
+                )
+
+            ngx.status = code
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 9: access local server and test log format
+--- request
+GET /opentracing
+--- response_body
+opentracing
+--- error_log eval
+qr/.*\{\\\"json\\\":\\\"\{(\\\\\\\"\@timestamp\\\\\\\":\\\\\\\".*\\\\\\\"|\\\\\\\"client_ip\\\\\\\":\\\\\\\"127\.0\.0\.1\\\\\\\"|\\\\\\\"host\\\\\\\":\\\\\\\"localhost\\\\\\\"|\\\\\\\"route_id\\\\\\\":\\\\\\\"1\\\\\\\"|,){7}\}/

Review comment:
       Not easy to read, can there be a more readable way? @dmsolr 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] wu-sheng commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747270123



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')

Review comment:
       Are you sure you want to do this? SkyWalking has tracer for APISIX, which has been integrated. Why don't you try to read it from the context?
   
   Decoding the header is fine from tech perspective, but it could be a risk when one day, Lua agent supports `namespace` concept.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747405003



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,242 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    local trace_context
+    local sw_header = ngx.req.get_headers()["sw8"]
+    if sw_header then
+        -- 1-TRACEID-SEGMENTID-SPANID-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(sw_header, '-')
+        if #ids == 8 then
+            local trace_id, err = base64.decode_base64url(ids[2])

Review comment:
       if so, we don't need to check the result of decode_base64url, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747300138



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')

Review comment:
       Got it.
   But now, skywalking-nginx-lua does not expose trace context. I would like to get merged this pr first. And keep going on polishing. 
   We need to enhancement skywalking-nginx-lua and release a new version. And then, I back to polishing this feature.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] moonming commented on pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
moonming commented on pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#issuecomment-966006766


   @wu-sheng please take a look, and welcome feedback:)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r748112458



##########
File path: t/plugin/skywalking-logger.t
##########
@@ -0,0 +1,269 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    my $http_config = $block->http_config // <<_EOC_;
+
+    server {
+        listen 1986;
+        server_tokens off;
+
+        location /v3/logs {
+            content_by_lua_block {
+                local core = require("apisix.core")
+
+                core.log.warn(core.json.encode(core.request.get_body(), true))
+            }
+        }
+    }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: sanity
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.skywalking-logger")
+            local ok, err = plugin.check_schema({endpoint_addr = "http://127.0.0.1"})
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+
+
+
+=== TEST 2: full schema check
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.skywalking-logger")
+            local ok, err = plugin.check_schema({endpoint_addr = "http://127.0.0.1",
+                                                 timeout = 3,
+                                                 name = "skywalking-logger",
+                                                 max_retry_count = 2,
+                                                 retry_delay = 2,
+                                                 buffer_duration = 2,
+                                                 inactive_timeout = 2,
+                                                 batch_max_size = 500,
+                                                 })
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+
+
+
+=== TEST 3: uri is missing
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.skywalking-logger")
+            local ok, err = plugin.check_schema({timeout = 3,
+                                                 name = "skywalking-logger",
+                                                 max_retry_count = 2,
+                                                 retry_delay = 2,
+                                                 buffer_duration = 2,
+                                                 inactive_timeout = 2,
+                                                 batch_max_size = 500,
+                                                 })
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+--- response_body
+property "endpoint_addr" is required
+done
+
+
+
+=== TEST 4: add plugin
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "skywalking-logger": {
+                                "endpoint_addr": "http://127.0.0.1:1986",
+                                "batch_max_size": 1,
+                                "max_retry_count": 1,
+                                "retry_delay": 2,
+                                "buffer_duration": 2,
+                                "inactive_timeout": 2
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1982": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/opentracing"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                "skywalking-logger": {
+                                    "endpoint_addr": "http://127.0.0.1:1986",
+                                    "batch_max_size": 1,
+                                    "max_retry_count": 1,
+                                    "retry_delay": 2,
+                                    "buffer_duration": 2,
+                                    "inactive_timeout": 2
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1982": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/opentracing"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 5: access local server
+--- request
+GET /opentracing
+--- response_body
+opentracing
+--- error_log
+Batch Processor[skywalking logger] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 6: test trace context header
+--- request
+GET /opentracing
+--- more_headers
+sw8: 1-YWU3MDk3NjktNmUyMC00YzY4LTk3MzMtMTBmNDU1MjE2Y2M1-YWU3MDk3NjktNmUyMC00YzY4LTk3MzMtMTBmNDU1MjE2Y2M1-1-QVBJU0lY-QVBJU0lYIEluc3RhbmNlIE5hbWU=-L2dldA==-dXBzdHJlYW0gc2VydmljZQ==
+--- response_body
+opentracing
+--- error_log eval
+qr/.*\\\"traceContext\\\":\{(\\\"traceSegmentId\\\":\\\"ae709769-6e20-4c68-9733-10f455216cc5\\\"|\\\"traceId\\\":\\\"ae709769-6e20-4c68-9733-10f455216cc5\\\"|\\\"spanId\\\":1|,){5}\}.*/
+--- wait: 0.5
+
+
+
+=== TEST 7: test wrong trace context header
+--- request
+GET /opentracing
+--- more_headers
+sw8: 1-YWU3MDk3NjktNmUyMC00YzY4LTk3MzMtMTBmNDU1MjE2Y2M1-YWU3MDk3NjktNmUyMC00YzY4LTk3MzMtMTBmNDU1MjE2Y2M1-1-QVBJU0lY-QVBJU0lYIEluc3RhbmNlIE5hbWU=-L2dldA==
+--- response_body
+opentracing
+--- error_log eval
+qr/failed to parse trace_context header:/
+--- wait: 0.5
+
+
+
+=== TEST 8: add plugin metadata
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/plugin_metadata/skywalking-logger',
+                ngx.HTTP_PUT,
+                [[{
+                    "log_format": {
+                        "host": "$host",
+                        "@timestamp": "$time_iso8601",
+                        "client_ip": "$remote_addr"
+                    }
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "log_format": {
+                                "host": "$host",
+                                "@timestamp": "$time_iso8601",
+                                "client_ip": "$remote_addr"
+                            }
+                        }
+                    },
+                    "action": "set"
+                }]]
+                )
+
+            ngx.status = code
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 9: access local server and test log format
+--- request
+GET /opentracing
+--- response_body
+opentracing
+--- error_log eval
+qr/.*\{\\\"json\\\":\\\"\{(\\\\\\\"\@timestamp\\\\\\\":\\\\\\\".*\\\\\\\"|\\\\\\\"client_ip\\\\\\\":\\\\\\\"127\.0\.0\.1\\\\\\\"|\\\\\\\"host\\\\\\\":\\\\\\\"localhost\\\\\\\"|\\\\\\\"route_id\\\\\\\":\\\\\\\"1\\\\\\\"|,){7}\}/

Review comment:
       this is raw skywalking log format.
   Seem like have some approach to simplify it. I will have a try later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] wu-sheng commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747292059



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])
+        }
+    else
+        trace_context = {}
+    end
+
+    local entry = {
+        traceContext = trace_context,
+        body = {
+            json = {
+                json = core.json.encode(log_body, true)
+            }
+        },
+        service = conf.service_name,
+        serviceInstance = conf.service_instance_name,
+        endpoint = ngx.var.uri,
+    }
+
+    if not stale_timer_running then
+        -- run the timer every 30 mins if any log is present

Review comment:
       I think this is not very general way to process logs. Usually, unless you have local cache or mmap(like SkyWalking Satellite), logs should be abandoned quickly once retry threshold reached.
   The reason behind this logic is, observability presents a large data set but isn't always very useful, especially for logs. Metrics are small, and tolerable to keep latest values, but different from logs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747212395



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),

Review comment:
       update

##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"

Review comment:
       removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] wu-sheng commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747292059



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])
+        }
+    else
+        trace_context = {}
+    end
+
+    local entry = {
+        traceContext = trace_context,
+        body = {
+            json = {
+                json = core.json.encode(log_body, true)
+            }
+        },
+        service = conf.service_name,
+        serviceInstance = conf.service_instance_name,
+        endpoint = ngx.var.uri,
+    }
+
+    if not stale_timer_running then
+        -- run the timer every 30 mins if any log is present

Review comment:
       I think this is not very general way to process logs. Usually, unless you have local cache or mmap(like SkyWalking Satellite), logs should be abandoned quickly once retry threshold reached.
   The reason behind this logic is, observability presents a large data set but isn't always very usually, especially for logs. Metrics are small, and tolerable to keep latest values, but different from logs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747281996



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])
+        }
+    else
+        trace_context = {}
+    end
+
+    local entry = {
+        traceContext = trace_context,
+        body = {
+            json = {
+                json = core.json.encode(log_body, true)
+            }
+        },
+        service = conf.service_name,
+        serviceInstance = conf.service_instance_name,
+        endpoint = ngx.var.uri,
+    }
+
+    if not stale_timer_running then
+        -- run the timer every 30 mins if any log is present

Review comment:
       it sweeps the stale log data in every 30m. 
   When the data transport fails, the data will remain in memory. Therefore, regular cleaning is required.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747179436



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])
+        }
+    else
+        trace_context = {}
+    end
+
+    local entry = {
+        traceContext = trace_context,
+        body = {
+            json = {
+                json = core.json.encode(log_body, true)
+            }
+        },
+        service = conf.service_name,
+        serviceInstance = conf.service_instance_name,
+        endpoint = ngx.var.uri,

Review comment:
       ```suggestion
           endpoint = ctx.var.uri,
   ```

##########
File path: docs/en/latest/plugins/skywalking-logger.md
##########
@@ -0,0 +1,117 @@
+---
+title: http-logger
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## Summary
+
+- [**Name**](#name)
+- [**Attributes**](#attributes)
+- [**How To Enable**](#how-to-enable)
+- [**Test Plugin**](#test-plugin)
+- [**Metadata**](#metadata)
+- [**Disable Plugin**](#disable-plugin)
+
+## Name
+
+`skywalking-logger` is a plugin which push Access Log data to `SkyWalking OAP` server over HTTP.
+
+This will provide the ability to send Access Log as JSON objects to `SkyWalking OAP` server.
+
+## Attributes
+
+| Name             | Type    | Requirement | Default       | Valid   | Description                                                                              |
+| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- |
+| endpoint_addr    | string  | required    |               |         | The URI of the `SkyWalking OAP` server.                                                  |
+| service_name     | string  | optional    | "APISIX"      |         | service name for SkyWalking reporter.                                                    |
+| service_instance_name | string  | optional   |"APISIX Instance Name" |    | service instance name for SkyWalking reporter,  set it to `$hostname` to get local hostname directly.|
+| timeout          | integer | optional    | 3             | [1,...] | Time to keep the connection alive after sending a request.                               |
+| name             | string  | optional    | "skywalking logger" |         | A unique identifier to identity the logger.                                              |
+| batch_max_size   | integer | optional    | 1000          | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `SkyWalking OAP` server. |
+| inactive_timeout | integer | optional    | 5             | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the `SkyWalking OAP` server regardless of whether the number of logs in the buffer reaches the maximum number set. |
+| buffer_duration  | integer | optional    | 60            | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.|
+| max_retry_count  | integer | optional    | 0             | [0,...] | Maximum number of retries before removing from the processing pipe line.                 |
+| retry_delay      | integer | optional    | 1             | [0,...] | Number of seconds the process execution should be delayed if the execution fails.        |
+| include_req_body | boolean | optional    | false         | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. |
+
+## How To Enable
+
+The following is an example of how to enable the `skywalking-logger` for a specific route. Before that, an available `SkyWalking OAP` server was required and accessible.
+
+```shell
+curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+      "plugins": {
+            "http-logger": {
+                "endpoint_addr": "http://127.0.0.1:12800"
+            }
+       },
+      "upstream": {
+           "type": "roundrobin",
+           "nodes": {
+               "127.0.0.1:1980": 1
+           }
+      },
+      "uri": "/hello"
+}'
+```
+
+## Test Plugin
+
+> success:
+
+```shell
+$ curl -i http://127.0.0.1:9080/hello
+HTTP/1.1 200 OK
+...
+hello, world
+```
+
+Completion of the steps, could find the Log details on `SkyWalking UI`.
+
+## Metadata
+
+`skywalking-logger` is also support to custom log format like [http-logger](./http-logger.md).

Review comment:
       ```suggestion
   `skywalking-logger` also supports to custom log format like [http-logger](./http-logger.md).
   ```

##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),

Review comment:
       Need to check the length of ids and the result of decode_base64url

##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])

Review comment:
       The span id is missing in the comment? 

##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"

Review comment:
       Is the `no-matched` necessary for the skywalking? If not, we can leave it empty.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747197261



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')
+
+        trace_context = {
+            traceId = base64.decode_base64url(ids[2]),
+            traceSegment = base64.decode_base64url(ids[3]),
+            spanId = tonumber(ids[4])

Review comment:
       It would be better if you also add the link to the skywalking's doc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747926580



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,242 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    local trace_context
+    local sw_header = ngx.req.get_headers()["sw8"]
+    if sw_header then
+        -- 1-TRACEID-SEGMENTID-SPANID-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(sw_header, '-')
+        if #ids == 8 then
+            local trace_id, err = base64.decode_base64url(ids[2])

Review comment:
       Currently, I prefer to collect the missing trace_context log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] dmsolr commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
dmsolr commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747276180



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,236 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    if not log_body.route_id then
+        log_body.route_id = "no-matched"
+    end
+
+    local trace_context
+    local headers = ngx.req.get_headers()
+    if headers then
+        -- 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(headers["sw8"], '-')

Review comment:
       I am considering about it still works when tracer plugin is not enabling. 
   If we give the version matching matric, it looks not very dangerous.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747920746



##########
File path: apisix/plugins/skywalking-logger.lua
##########
@@ -0,0 +1,242 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local batch_processor = require("apisix.utils.batch-processor")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local base64          = require("ngx.base64")
+local ngx_re          = require("ngx.re")
+
+local ngx      = ngx
+local tostring = tostring
+local tonumber = tonumber
+local ipairs   = ipairs
+local timer_at = ngx.timer.at
+
+local plugin_name = "skywalking-logger"
+local stale_timer_running = false
+local buffers = {}
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        service_name = {type = "string", default = "APISIX"},
+        service_instance_name = {type = "string", default = "APISIX Instance Name"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "skywalking logger"},
+        max_retry_count = {type = "integer", minimum = 0, default = 0},
+        retry_delay = {type = "integer", minimum = 0, default = 1},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
+        include_req_body = {type = "boolean", default = false},
+    },
+    required = {"endpoint_addr"},
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 408,
+    name = plugin_name,
+    schema = schema,
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr)
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = "/v3/logs",
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then
+            core.log.warn("removing batch processor stale object, conf: ",
+                          core.json.delay_encode(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+
+    local log_body
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        log_body = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        log_body = log_util.get_full_log(ngx, conf)
+    end
+
+    local trace_context
+    local sw_header = ngx.req.get_headers()["sw8"]
+    if sw_header then
+        -- 1-TRACEID-SEGMENTID-SPANID-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT
+        local ids = ngx_re.split(sw_header, '-')
+        if #ids == 8 then
+            local trace_id, err = base64.decode_base64url(ids[2])

Review comment:
       If I am correct, we should check if the trace_id is nil or not? The trace_id is required.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] wu-sheng commented on a change in pull request #5478: feat: provide skywalking logger plugin

Posted by GitBox <gi...@apache.org>.
wu-sheng commented on a change in pull request #5478:
URL: https://github.com/apache/apisix/pull/5478#discussion_r747408864



##########
File path: docs/en/latest/plugins/skywalking-logger.md
##########
@@ -0,0 +1,117 @@
+---
+title: http-logger

Review comment:
       Is this the right title?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org