You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apisix.apache.org by 蔡兴 <dx...@163.com> on 2020/01/07 03:02:22 UTC

[DISCUSS]]Add Log plugin with kafka

Hi, all,
I want to add a log plugin which produce log through kafka

The general information is as follows:
1. Collect log information
2. Get kafka config 
3. Send kafka msg


A brief code presentation:

```
local producer = require "resty.kafka.producer"

 function _M.log(conf, ctx)
    local consume = ctx.var.upstream_response_time or 0
    local par = ngx.encode_args(req.params())
    if string.len(par) > 1000 then
        par = string.sub(par,1,1000) .. '...'
    end

    local log_json = {}
    log_json["entry"] = ctx.entry
    log_json["uid"] = ctx.request_uid or 2
    log_json["username"] = ctx.request_username or "odin_api"
    log_json["user_ip"] = ctx.var.remote_addr
    log_json["url"] = ctx.var.scheme .. '://' .. ctx.var.host .. ':' .. ctx.var.server_port .. ctx.var.uri
    log_json["route"] = ctx.var.uri
    log_json["host"] = ctx.var.upstream_host
    log_json["method"] = ngx.req.get_method()
    log_json["params"] = par
    log_json["consume"] = ctx.var.request_time * 1000
    log_json["upstream_consume"] = consume * 1000
    log_json["status"] = ngx.status
    log_json["logtime"] = time.now()
    --- you can log more info 


    --- get kafka confg
    local local_conf, err = fetch_local_conf()
    if not local_conf then
        return nil, nil, err
    end

    local opts = clone_tab(local_conf.kafka)
    local kafka_topic = opts.topic
    local broker_host = opts.broker_list_host
    local broker_port = opts.broker_list_port

    local topic = kafka_topic
    local broker_list = {
        { host = broker_host, port = broker_port },
    }

    -- kafka producer
    local p = producer:new(broker_list, { producer_type = "async" })

    --- send msg 
    local offset, err = p:send(topic, nil, message)
    if not offset then
        return
    end

```
	





Re: [DISCUSS]]Add Log plugin with kafka

Posted by YuanSheng Wang <me...@gmail.com>.
Welcome PR.
We can discuss it in PR for more details.

^_^

On Tue, Jan 7, 2020 at 11:09 AM Ming Wen <we...@apache.org> wrote:

> cool, the Apache Kafka plugin is really what we want.
> Looking forward to this PR.
>
> Thanks,
> Ming Wen, Apache APISIX
> Twitter: _WenMing
>
>
> 蔡兴 <dx...@163.com> 于2020年1月7日周二 上午11:02写道:
>
> > Hi, all,
> > I want to add a log plugin which produce log through kafka
> >
> > The general information is as follows:
> > 1. Collect log information
> > 2. Get kafka config
> > 3. Send kafka msg
> >
> >
> > A brief code presentation:
> >
> > ```
> > local producer = require "resty.kafka.producer"
> >
> >  function _M.log(conf, ctx)
> >     local consume = ctx.var.upstream_response_time or 0
> >     local par = ngx.encode_args(req.params())
> >     if string.len(par) > 1000 then
> >         par = string.sub(par,1,1000) .. '...'
> >     end
> >
> >     local log_json = {}
> >     log_json["entry"] = ctx.entry
> >     log_json["uid"] = ctx.request_uid or 2
> >     log_json["username"] = ctx.request_username or "odin_api"
> >     log_json["user_ip"] = ctx.var.remote_addr
> >     log_json["url"] = ctx.var.scheme .. '://' .. ctx.var.host .. ':' ..
> > ctx.var.server_port .. ctx.var.uri
> >     log_json["route"] = ctx.var.uri
> >     log_json["host"] = ctx.var.upstream_host
> >     log_json["method"] = ngx.req.get_method()
> >     log_json["params"] = par
> >     log_json["consume"] = ctx.var.request_time * 1000
> >     log_json["upstream_consume"] = consume * 1000
> >     log_json["status"] = ngx.status
> >     log_json["logtime"] = time.now()
> >     --- you can log more info
> >
> >
> >     --- get kafka confg
> >     local local_conf, err = fetch_local_conf()
> >     if not local_conf then
> >         return nil, nil, err
> >     end
> >
> >     local opts = clone_tab(local_conf.kafka)
> >     local kafka_topic = opts.topic
> >     local broker_host = opts.broker_list_host
> >     local broker_port = opts.broker_list_port
> >
> >     local topic = kafka_topic
> >     local broker_list = {
> >         { host = broker_host, port = broker_port },
> >     }
> >
> >     -- kafka producer
> >     local p = producer:new(broker_list, { producer_type = "async" })
> >
> >     --- send msg
> >     local offset, err = p:send(topic, nil, message)
> >     if not offset then
> >         return
> >     end
> >
> > ```
> >
> >
> >
> >
> >
> >
>


-- 

*MembPhis*
My github: https://github.com/membphis
Apache APISIX: https://github.com/apache/incubator-apisix

Re: [DISCUSS]]Add Log plugin with kafka

Posted by Ming Wen <we...@apache.org>.
cool, the Apache Kafka plugin is really what we want.
Looking forward to this PR.

Thanks,
Ming Wen, Apache APISIX
Twitter: _WenMing


蔡兴 <dx...@163.com> 于2020年1月7日周二 上午11:02写道:

> Hi, all,
> I want to add a log plugin which produce log through kafka
>
> The general information is as follows:
> 1. Collect log information
> 2. Get kafka config
> 3. Send kafka msg
>
>
> A brief code presentation:
>
> ```
> local producer = require "resty.kafka.producer"
>
>  function _M.log(conf, ctx)
>     local consume = ctx.var.upstream_response_time or 0
>     local par = ngx.encode_args(req.params())
>     if string.len(par) > 1000 then
>         par = string.sub(par,1,1000) .. '...'
>     end
>
>     local log_json = {}
>     log_json["entry"] = ctx.entry
>     log_json["uid"] = ctx.request_uid or 2
>     log_json["username"] = ctx.request_username or "odin_api"
>     log_json["user_ip"] = ctx.var.remote_addr
>     log_json["url"] = ctx.var.scheme .. '://' .. ctx.var.host .. ':' ..
> ctx.var.server_port .. ctx.var.uri
>     log_json["route"] = ctx.var.uri
>     log_json["host"] = ctx.var.upstream_host
>     log_json["method"] = ngx.req.get_method()
>     log_json["params"] = par
>     log_json["consume"] = ctx.var.request_time * 1000
>     log_json["upstream_consume"] = consume * 1000
>     log_json["status"] = ngx.status
>     log_json["logtime"] = time.now()
>     --- you can log more info
>
>
>     --- get kafka confg
>     local local_conf, err = fetch_local_conf()
>     if not local_conf then
>         return nil, nil, err
>     end
>
>     local opts = clone_tab(local_conf.kafka)
>     local kafka_topic = opts.topic
>     local broker_host = opts.broker_list_host
>     local broker_port = opts.broker_list_port
>
>     local topic = kafka_topic
>     local broker_list = {
>         { host = broker_host, port = broker_port },
>     }
>
>     -- kafka producer
>     local p = producer:new(broker_list, { producer_type = "async" })
>
>     --- send msg
>     local offset, err = p:send(topic, nil, message)
>     if not offset then
>         return
>     end
>
> ```
>
>
>
>
>
>