You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/02/23 08:01:30 UTC

[skywalking-nginx-lua] branch master updated: Finish the basic codes.

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-nginx-lua.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eff887  Finish the basic codes.
8eff887 is described below

commit 8eff88726a2a22371ab6be38b55d9af0e14e7e9f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Feb 23 16:01:14 2020 +0800

    Finish the basic codes.
---
 README.md                   |  61 +++++++++++++++++++-
 examples/nginx.conf         | 101 +++++++++------------------------
 lib/skywalking/client.lua   | 135 +++++++++++++++++++++++++++++++++++++-------
 lib/skywalking/register.lua |  10 ++++
 lib/skywalking/tracer.lua   |  77 ++++++++++++++++++++++++-
 5 files changed, 288 insertions(+), 96 deletions(-)

diff --git a/README.md b/README.md
index 5713cc7..6e02a24 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,50 @@ This agent follows the SkyWalking tracing and header protocol. It reports tracin
 All HTTP 1.1 requests go through Nginx could be collected by this agent.
 
 # Setup Doc
-TODO
+```
+http {
+    lua_package_path "/Path/to/.../skywalking-nginx-lua/lib/skywalking/?.lua;;";
+
+    # Buffer represents the register inform and the queue of the finished segment 
+    lua_shared_dict tracing_buffer 100m;
+    
+    # Init is the timer setter and keeper
+    # Setup an infinite loop timer to do register and trace report.
+    init_worker_by_lua_block {
+        local metadata_buffer = ngx.shared.tracing_buffer
+
+        -- Set service name
+        metadata_buffer:set('serviceName', 'User Service Name')
+        -- Instance means the number of Nginx deloyment, does not mean the worker instances
+        metadata_buffer:set('serviceInstanceName', 'User Service Instance Name')
+
+        require("client"):startBackendTimer("http://127.0.0.1:8080")
+    }
+
+    server {
+        listen 8080;
+
+        location /ingress {
+            default_type text/html;
+
+            rewrite_by_lua_block {
+                require("tracer"):start()
+            }
+
+            -- Target upstream service
+            proxy_pass http://127.0.0.1:8080/backend;
+
+            body_filter_by_lua_block {
+                require("tracer"):finish()
+            }
+
+            log_by_lua_block {
+                require("tracer"):prepareForReport()
+            }
+        }
+    }
+}
+```
 
 # Set up dev env
 All codes in the `lib/skywalking` require the `*_test.lua` to do the UnitTest. To run that, you need to install
@@ -30,6 +73,22 @@ This LUA tracing lib is originally designed for Nginx+LUA/OpenResty ecosystems.
 If you just use this in the Ngnix, [Setup Doc](#setup-doc) should be good enough.
 The following APIs are for developers or using this lib out of the Nginx case.
 
+## Nginx APIs
+- **startTimer**, `require("client"):startBackendTimer("http://127.0.0.1:8080")`. Start the backend timer. This timer register the metadata and report traces to the backend.
+- **start**, `require("tracer"):start()`. Begin the tracing before the upstream begin.
+- **finish**, `require("tracer"):finish()`. Finish the tracing for this HTTP request.
+- **prepareForReport**, `require("tracer"):prepareForReport()`. Prepare the finished segment for further report.
+
+## Tracing APIs at LUA level
+**TracingContext** is the entrance API for lua level tracing.
+- `TracingContext:new(serviceId, serviceInstID)`, create an active tracing context.
+- `TracingContext:newNoOP()`, create a no OP tracing context.
+- `TracingContext:drainAfterFinished()`, fetch the segment includes all finished spans.
+
+Create 2 kinds of span
+- `TracingContext:createEntrySpan(operationName, parent, contextCarrier)`
+- `TracingContext:createExitSpan(operationName, parent, peer, contextCarrier)`
+
 
 # Download
 Have no release yet.
diff --git a/examples/nginx.conf b/examples/nginx.conf
index 01d06c1..c5b8f6f 100644
--- a/examples/nginx.conf
+++ b/examples/nginx.conf
@@ -33,10 +33,12 @@ http {
     # Setup an infinite loop timer to do register and trace report.
     init_worker_by_lua_block {
         local metadata_buffer = ngx.shared.tracing_buffer
-        metadata_buffer['serviceName'] = 'Service Name'
-        metadata_buffer['serviceInstanceName'] = 'Service Instance Name'
 
-        require("client"):startTimer(metadata_buffer, "http://127.0.0.1:8080/skywalking")
+        metadata_buffer:set('serviceName', 'User Service Name')
+        -- Instance means the number of Nginx deloyment, does not mean the worker instances
+        metadata_buffer:set('serviceInstanceName', 'User Service Instance Name')
+
+        require("client"):startBackendTimer("http://127.0.0.1:8080")
     }
 
     server {
@@ -49,84 +51,17 @@ http {
             default_type text/html;
 
             rewrite_by_lua_block {
-                local TC = require('tracing_context')
-                local Layer = require('span_layer')
-
-                local metadata_buffer = ngx.shared.tracing_buffer
-                -- Mock the service instance id
-                metadata_buffer['serviceId'] = 1
-                metadata_buffer['serviceInstId'] = 1
-
-                local tracingContext
-                if metadata_buffer['serviceInstId'] ~= nil then
-                    tracingContext = TC:new(metadata_buffer['serviceId'], metadata_buffer['serviceInstId'])
-                else
-                    tracingContext = TC:newNoOP()
-                end
-
-                -- Constant pre-defined in SkyWalking main repo
-                -- 84 represents Nginx
-                local nginxComponentId = 6000
-
-                local contextCarrier = {}
-                contextCarrier["sw6"] = ngx.req.get_headers()["sw6"]
-                local entrySpan = tracingContext:createEntrySpan(ngx.var.uri, nil, contextCarrier)
-                entrySpan:start(ngx.now() * 1000)
-                entrySpan:setComponentId(nginxComponentId)
-                entrySpan:setLayer(Layer.HTTP)
-                
-                entrySpan:tag('http.method', ngx.req.get_method())
-                entrySpan:tag('http.params', ngx.var.scheme .. '://' .. ngx.var.host .. ngx.var.request_uri )
-                
-                contextCarrier = {}
-                -- Use the same URI to represent incoming and forwarding requests
-                -- Change it if you need.
-                local upstreamUri = ngx.var.uri
-                ------------------------------------------------------
-                -- NOTICE, this should be changed manually
-                -- This variable represents the upstream logic address
-                -- Please set them as service logic name or DNS name
-                ------------------------------------------------------
-                local upstreamServerName = "upstream_ip:port"
-                ------------------------------------------------------
-                local exitSpan = tracingContext:createExitSpan(upstreamUri, entrySpan, upstreamServerName, contextCarrier)
-                exitSpan:start(ngx.now() * 1000)
-                exitSpan:setComponentId(nginxComponentId)
-                exitSpan:setLayer(Layer.HTTP)
-                
-                for name, value in pairs(contextCarrier) do
-                    ngx.req.set_header(name, value)
-                end
-
-                -- Push the data in the context
-                ngx.ctx.tracingContext = tracingContext
-                ngx.ctx.entrySpan = entrySpan
-                ngx.ctx.exitSpan = exitSpan
+                require("tracer"):startBackendTimer()
             }
 
             proxy_pass http://127.0.0.1:8080/backend;
 
             body_filter_by_lua_block {
-                -- Finish the exit span when received the first response package from upstream
-                if ngx.ctx.exitSpan ~= nil then
-                    ngx.ctx.exitSpan:finish(ngx.now() * 1000)
-                    ngx.ctx.exitSpan = nil
-                end
+                require("tracer"):finish()
             }
 
             log_by_lua_block {
-                if ngx.ctx.entrySpan ~= nil then
-                    ngx.ctx.entrySpan:finish(ngx.now() * 1000)
-                    local status, segment = ngx.ctx.tracingContext:drainAfterFinished()
-                    if status then
-                        local segmentJson = require('cjson').encode(segment:transform())
-                        ngx.log(ngx.DEBUG, 'segment = ' .. segmentJson)
-
-                        local queue = ngx.shared.tracing_buffer
-                        local length = queue:lpush('segment', segmentJson)
-                        ngx.log(ngx.DEBUG, 'segment buffer size = ' .. queue:llen('segment'))
-                    end
-                end
+                require("tracer"):prepareForReport()
             }
         }
 
@@ -176,5 +111,25 @@ http {
                 ngx.say(cjson.encode(registeredInfo))
             }
         }
+
+        location /v2/instance/heartbeat {
+            default_type text/html;
+            lua_need_request_body on;
+
+            content_by_lua_block {
+                local cjson = require('cjson')
+                --ngx.log(ngx.DEBUG, 'Service instance ping request = ', ngx.req.get_body_data())
+            }
+        }
+
+        location /v2/segments {
+            default_type text/html;
+            lua_need_request_body on;
+
+            content_by_lua_block {
+                local cjson = require('cjson')
+                ngx.log(ngx.DEBUG, 'Received segment = ', ngx.req.get_body_data())
+            }
+        }
     }
 }
\ No newline at end of file
diff --git a/lib/skywalking/client.lua b/lib/skywalking/client.lua
index 6a663fc..ebe0b11 100644
--- a/lib/skywalking/client.lua
+++ b/lib/skywalking/client.lua
@@ -19,7 +19,9 @@ local Client = {}
 
 -- Tracing timer does the service and instance register
 -- After register successfully, it sends traces and heart beat
-function Client:startTimer(metadata_buffer, backend_http_uri)
+function Client:startBackendTimer(backend_http_uri)
+    local metadata_buffer = ngx.shared.tracing_buffer
+
     -- The codes of timer setup is following the OpenResty timer doc
     local delay = 3  -- in seconds
     local new_timer = ngx.timer.at
@@ -30,19 +32,25 @@ function Client:startTimer(metadata_buffer, backend_http_uri)
 
     check = function(premature)
         if not premature then
-            if metadata_buffer['serviceId'] == nil then
+            local serviceId = metadata_buffer:get('serviceId')
+            if serviceId == nil then
                 self:registerService(metadata_buffer, backend_http_uri)
             end
 
             -- Register is in the async way, if register successfully, go for instance register
-            if metadata_buffer['serviceId'] ~= nil then
-                if metadata_buffer['serviceInstId'] == nil then
+            serviceId = metadata_buffer:get('serviceId')
+            if serviceId ~= nil then
+                local serviceInstId = metadata_buffer:get('serviceInstId')
+                if serviceInstId == nil then
                     self:registerServiceInstance(metadata_buffer, backend_http_uri)
                 end
             end
 
             -- After all register successfully, begin to send trace segments
-            if metadata_buffer['serviceInstId'] ~= nil then
+            local serviceInstId = metadata_buffer:get('serviceInstId')
+            if serviceInstId ~= nil then
+                self:reportTraces(metadata_buffer, backend_http_uri)
+                self:ping(metadata_buffer, backend_http_uri)
             end
 
             -- do the health check
@@ -67,8 +75,9 @@ end
 function Client:registerService(metadata_buffer, backend_http_uri)
     local log = ngx.log
     local DEBUG = ngx.DEBUG
+    local ERR = ngx.ERR
 
-    local serviceName = metadata_buffer['serviceName']
+    local serviceName = metadata_buffer:get('serviceName')
     
     local cjson = require('cjson')
     local serviceRegister = require("register"):newServiceRegister(serviceName)
@@ -84,7 +93,9 @@ function Client:registerService(metadata_buffer, backend_http_uri)
         },
     })
 
-    if #res.body > 0 then
+    if not res then
+        log(ERR, "Service register fails, " .. err)
+    elseif res.status == 200 then
         log(DEBUG, "Service register response = " .. res.body)
         local registerResults = cjson.decode(res.body)
 
@@ -93,9 +104,11 @@ function Client:registerService(metadata_buffer, backend_http_uri)
             if result.key == serviceName then
                 local serviceId = result.value 
                 log(DEBUG, "Service registered, service id = " .. serviceId)
-                metadata_buffer['serviceId'] = serviceId
+                metadata_buffer:set('serviceId', serviceId)
             end
         end
+    else
+        log(ERR, "Service register fails, response code " .. res.status)
     end
 end
 
@@ -103,12 +116,14 @@ end
 function Client:registerServiceInstance(metadata_buffer, backend_http_uri)
     local log = ngx.log
     local DEBUG = ngx.DEBUG
+    local ERR = ngx.ERR
 
-    local serviceInstName = 'name:' .. metadata_buffer['serviceInstanceName']
+    local serviceInstName = 'name:' .. metadata_buffer:get('serviceInstanceName')
+    metadata_buffer:set('serviceInstanceUUID', serviceInstName)
 
     local cjson = require('cjson')
     local serviceInstanceRegister = require("register"):newServiceInstanceRegister(
-        metadata_buffer['serviceId'], 
+        metadata_buffer:get('serviceId'), 
         serviceInstName, 
         ngx.now() * 1000)
     local serviceInstanceRegisterParam = cjson.encode(serviceInstanceRegister)
@@ -123,18 +138,100 @@ function Client:registerServiceInstance(metadata_buffer, backend_http_uri)
         },
     })
 
-    if #res.body > 0 then
-        log(DEBUG, "Service Instance register response = " .. res.body)
-        local registerResults = cjson.decode(res.body)
+    if err == nil then
+        if res.status == 200 then
+            log(DEBUG, "Service Instance register response = " .. res.body)
+            local registerResults = cjson.decode(res.body)
+
+            for i, result in ipairs(registerResults)
+            do
+                if result.key == serviceInstName then
+                    local serviceId = result.value 
+                    log(DEBUG, "Service Instance registered, service instance id = " .. serviceId)
+                    metadata_buffer:set('serviceInstId', serviceId)
+                end
+            end
+        else
+            log(ERR, "Service Instance register fails, response code " .. res.status)
+        end
+    else
+        log(ERR, "Service Instance register fails, " .. err)
+    end
+end
 
-        for i, result in ipairs(registerResults)
-        do
-            if result.key == serviceInstName then
-                local serviceId = result.value 
-                log(DEBUG, "Service Instance registered, service instance id = " .. serviceId)
-                metadata_buffer['serviceInstId'] = serviceId
+-- Ping the backend to update instance heartheat
+function Client:ping(metadata_buffer, backend_http_uri)
+    local log = ngx.log
+    local DEBUG = ngx.DEBUG
+    local ERR = ngx.ERR
+
+    local cjson = require('cjson')
+    local pingPkg = require("register"):newServiceInstancePingPkg(
+        metadata_buffer:get('serviceInstId'), 
+        metadata_buffer:get('serviceInstanceUUID'), 
+        ngx.now() * 1000)
+    local pingPkgParam = cjson.encode(pingPkg)
+
+    local http = require('resty.http')
+    local httpc = http.new()
+    local res, err = httpc:request_uri(backend_http_uri .. '/v2/instance/heartbeat', {
+        method = "POST",
+        body = pingPkgParam,
+        headers = {
+            ["Content-Type"] = "application/json",
+        },
+    })
+
+    if err == nil then
+        if res.status ~= 200 then
+            log(ERR, "Agent ping fails, response code " .. res.status)
+        end
+    else
+        log(ERR, "Agent ping fails, " .. err)
+    end
+end
+
+-- Report trace segments to the backend
+function Client:reportTraces(metadata_buffer, backend_http_uri)
+    local log = ngx.log
+    local DEBUG = ngx.DEBUG
+    local ERR = ngx.ERR
+
+    local queue = ngx.shared.tracing_buffer
+    local segment = queue:rpop('segment')
+
+    local count = 0;
+
+    local http = require('resty.http')
+    local httpc = http.new()
+
+    while segment ~= nil
+    do
+        local res, err = httpc:request_uri(backend_http_uri .. '/v2/segments', {
+            method = "POST",
+            body = segment,
+            headers = {
+                ["Content-Type"] = "application/json",
+            },
+        })
+        
+        if err == nil then
+            if res.status ~= 200 then
+                log(ERR, "Segment report fails, response code " .. res.status)
+                break
+            else
+                count = count + 1
             end
+        else
+            log(ERR, "Segment report fails, " .. err)
+            break
         end
+
+        segment = queue:rpop('segment')
+    end
+
+    if count > 0 then
+        log(ERR, count,  " segments reported.")
     end
 end
 
diff --git a/lib/skywalking/register.lua b/lib/skywalking/register.lua
index b6645bf..988552f 100644
--- a/lib/skywalking/register.lua
+++ b/lib/skywalking/register.lua
@@ -53,4 +53,14 @@ function Register:newServiceInstanceRegister(registeredServiceId, serviceInstUUI
     return serviceInstances
 end
 
+function Register:newServiceInstancePingPkg(registeredServiceInstId, serviceInstUUID, updateTime)
+    local serviceInstancePingPkg = {
+        serviceInstanceId = registeredServiceInstId,
+        time = updateTime,
+        serviceInstanceUUID = serviceInstUUID,
+    }
+
+    return serviceInstancePingPkg
+end
+
 return Register
\ No newline at end of file
diff --git a/lib/skywalking/tracer.lua b/lib/skywalking/tracer.lua
index 2d8c3a8..7eceaff 100644
--- a/lib/skywalking/tracer.lua
+++ b/lib/skywalking/tracer.lua
@@ -17,13 +17,84 @@
 
 local Tracer = {}
 
-function Tracer.new()
+function Tracer:startBackendTimer()
+    local metadata_buffer = ngx.shared.tracing_buffer
+    local TC = require('tracing_context')
+    local Layer = require('span_layer')
+
+    local tracingContext
+    local serviceName = metadata_buffer:get("serviceName")
+    local serviceInstId = metadata_buffer:get("serviceInstId")
+    local serviceId = metadata_buffer:get('serviceId')
+    if serviceInstId ~= nil then
+        tracingContext = TC:new(serviceId, serviceInstId)
+    else
+        tracingContext = TC:newNoOP()
+    end
+
+    -- Constant pre-defined in SkyWalking main repo
+    -- 84 represents Nginx
+    local nginxComponentId = 6000
+
+    local contextCarrier = {}
+    contextCarrier["sw6"] = ngx.req.get_headers()["sw6"]
+    local entrySpan = tracingContext:createEntrySpan(ngx.var.uri, nil, contextCarrier)
+    entrySpan:start(ngx.now() * 1000)
+    entrySpan:setComponentId(nginxComponentId)
+    entrySpan:setLayer(Layer.HTTP)
+    
+    entrySpan:tag('http.method', ngx.req.get_method())
+    entrySpan:tag('http.params', ngx.var.scheme .. '://' .. ngx.var.host .. ngx.var.request_uri )
+    
+    contextCarrier = {}
+    -- Use the same URI to represent incoming and forwarding requests
+    -- Change it if you need.
+    local upstreamUri = ngx.var.uri
+    ------------------------------------------------------
+    -- NOTICE, this should be changed manually
+    -- This variable represents the upstream logic address
+    -- Please set them as service logic name or DNS name
+    --
+    -- TODO, currently, we can't have the upstream real network address
+    ------------------------------------------------------
+    local upstreamServerName = serviceName .. "-nginx:upstream_ip:port"
+    ------------------------------------------------------
+    local exitSpan = tracingContext:createExitSpan(upstreamUri, entrySpan, upstreamServerName, contextCarrier)
+    exitSpan:start(ngx.now() * 1000)
+    exitSpan:setComponentId(nginxComponentId)
+    exitSpan:setLayer(Layer.HTTP)
     
+    for name, value in pairs(contextCarrier) do
+        ngx.req.set_header(name, value)
+    end
+
+    -- Push the data in the context
+    ngx.ctx.tracingContext = tracingContext
+    ngx.ctx.entrySpan = entrySpan
+    ngx.ctx.exitSpan = exitSpan
+end
+
+function Tracer:finish()
+    -- Finish the exit span when received the first response package from upstream
+    if ngx.ctx.exitSpan ~= nil then
+        ngx.ctx.exitSpan:finish(ngx.now() * 1000)
+        ngx.ctx.exitSpan = nil
+    end
 end
 
+function Tracer:prepareForReport()
+    if ngx.ctx.entrySpan ~= nil then
+        ngx.ctx.entrySpan:finish(ngx.now() * 1000)
+        local status, segment = ngx.ctx.tracingContext:drainAfterFinished()
+        if status then
+            local segmentJson = require('cjson').encode(segment:transform())
+            ngx.log(ngx.DEBUG, 'segment = ' .. segmentJson)
 
-function Tracer.createEntrySpan(context, operationName, sw6_header_value)
-    
+            local queue = ngx.shared.tracing_buffer
+            local length = queue:lpush('segment', segmentJson)
+            ngx.log(ngx.DEBUG, 'segment buffer size = ' .. queue:llen('segment'))
+        end
+    end
 end
 
 return Tracer
\ No newline at end of file