You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2022/06/03 14:33:38 UTC

[GitHub] [apisix] bisakhmondal commented on a diff in pull request #7099: init elasticsearch-logger.lua

bisakhmondal commented on code in PR #7099:
URL: https://github.com/apache/apisix/pull/7099#discussion_r889003250


##########
apisix/plugins/elasticsearch-logger.lua:
##########
@@ -0,0 +1,181 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local bp_manager_mod  = require("apisix.utils.batch-processor-manager")
+local log_util        = require("apisix.utils.log-util")
+local core            = require("apisix.core")
+local http            = require("resty.http")
+local url             = require("net.url")
+local plugin          = require("apisix.plugin")
+
+local ngx      = ngx
+local tostring = tostring
+local encode_base64   = ngx.encode_base64
+
+local plugin_name = "elasticsearch-logger"
+local batch_processor_manager = bp_manager_mod.new(plugin_name)
+
+local schema = {
+    type = "object",
+    properties = {
+        endpoint_addr = core.schema.uri_def,
+        user = {type = "string", default = ""},
+        password = {type = "string", default = ""},
+        index = {type = "string", default = "apisix-elasticsearch-logger"},
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "elasticsearch logger"},
+        ssl_verify = {type = "boolean", default = true},
+    },
+    required = {"endpoint_addr", "user", "password", "index"}
+}
+
+
+local metadata_schema = {
+    type = "object",
+    properties = {
+        log_format = log_util.metadata_schema_log_format,
+    },
+}
+
+
+local _M = {
+    version = 0.1,
+    priority = 398,
+    name = plugin_name,
+    schema = batch_processor_manager:wrap_schema(schema),
+    metadata_schema = metadata_schema,
+}
+
+
+function _M.check_schema(conf, schema_type)
+    if schema_type == core.schema.TYPE_METADATA then
+        return core.schema.check(metadata_schema, conf)
+    end
+    return core.schema.check(schema, conf)
+end
+
+
+local function send_http_data(conf, log_message)
+    local err_msg
+    local res = true
+    local url_decoded = url.parse(conf.endpoint_addr)
+    local host = url_decoded.host
+    local port = url_decoded.port
+    local index = conf.index .. "-" .. os.date('%Y.%m.%d')
+
+    core.log.info("sending a batch logs to ", conf.endpoint_addr .. index)
+
+    if not port then
+        if url_decoded.scheme == "https" then
+            port = 443
+        else
+            port = 80
+        end
+    end
+
+    local httpc = http.new()
+    httpc:set_timeout(conf.timeout * 1000)
+    local ok, err = httpc:connect(host, port)
+
+    if not ok then
+        return false, "failed to connect to host[" .. host .. "] port["
+            .. tostring(port) .. "] " .. err
+    end
+
+    if url_decoded.scheme == "https" then
+        ok, err = httpc:ssl_handshake(true, host, conf.ssl_verify)
+        if not ok then
+            return false, "failed to perform SSL with host[" .. host .. "] "
+                .. "port[" .. tostring(port) .. "] " .. err
+        end
+    end
+
+    local url_path = index .. "/_doc"
+    local user_pass = conf.user .. ":" .. conf.password
+    user_pass = "Basic " .. encode_base64(user_pass)
+    local httpc_res, httpc_err = httpc:request({
+        method = "POST",
+        path = url_path,
+        query = url_decoded.query,
+        body = log_message,
+        headers = {
+            ["Host"] = url_decoded.host,
+            ["Content-Type"] = "application/json",
+            ["Authorization"] = user_pass,
+        }
+    })
+
+    if not httpc_res then
+        return false, "error while sending data to [" .. host .. "] port["
+            .. tostring(port) .. "] " .. httpc_err
+    end
+
+    -- some error occurred in the server
+    if httpc_res.status >= 400 then
+        res =  false
+        err_msg = "server returned status code[" .. httpc_res.status .. "] host["
+            .. host .. "] port[" .. tostring(port) .. "] "
+            .. "body[" .. httpc_res:read_body() .. "]"
+    end
+
+    return res, err_msg
+end
+
+
+function _M.log(conf, ctx)
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
+    local entry
+
+    if metadata and metadata.value.log_format
+       and core.table.nkeys(metadata.value.log_format) > 0
+    then
+        entry = log_util.get_custom_format_log(ctx, metadata.value.log_format)
+    else
+        entry = log_util.get_full_log(ngx, conf)
+    end
+
+    if batch_processor_manager:add_entry(conf, entry) then
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
+        local data, err
+
+        if batch_max_size == 1 then
+            data, err = core.json.encode(entries[1]) -- encode as single {}
+        else
+            local log_table = {}
+            for i = 1, #entries do
+                core.table.insert(log_table, core.json.encode(entries[i]))
+            end
+            data = core.table.concat(log_table, " ")  -- assemble multi items as string "{} {}"

Review Comment:
   Just curious, are the space-separated JSONs going to be treated as separate events?
   
   Else, just a suggestion - we can always use the elasticsearch bulk API (`endpoint/index/_bulk`). The indexing speed is much faster there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org