You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2020/02/23 08:01:30 UTC
[skywalking-nginx-lua] branch master updated: Finish the basic
codes.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-nginx-lua.git
The following commit(s) were added to refs/heads/master by this push:
new 8eff887 Finish the basic codes.
8eff887 is described below
commit 8eff88726a2a22371ab6be38b55d9af0e14e7e9f
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Feb 23 16:01:14 2020 +0800
Finish the basic codes.
---
README.md | 61 +++++++++++++++++++-
examples/nginx.conf | 101 +++++++++------------------------
lib/skywalking/client.lua | 135 +++++++++++++++++++++++++++++++++++++-------
lib/skywalking/register.lua | 10 ++++
lib/skywalking/tracer.lua | 77 ++++++++++++++++++++++++-
5 files changed, 288 insertions(+), 96 deletions(-)
diff --git a/README.md b/README.md
index 5713cc7..6e02a24 100644
--- a/README.md
+++ b/README.md
@@ -14,7 +14,50 @@ This agent follows the SkyWalking tracing and header protocol. It reports tracin
All HTTP 1.1 requests go through Nginx could be collected by this agent.
# Setup Doc
-TODO
+```
+http {
+ lua_package_path "/Path/to/.../skywalking-nginx-lua/lib/skywalking/?.lua;;";
+
+ # Buffer represents the register inform and the queue of the finished segment
+ lua_shared_dict tracing_buffer 100m;
+
+ # Init is the timer setter and keeper
+ # Setup an infinite loop timer to do register and trace report.
+ init_worker_by_lua_block {
+ local metadata_buffer = ngx.shared.tracing_buffer
+
+ -- Set service name
+ metadata_buffer:set('serviceName', 'User Service Name')
+ -- Instance means the number of Nginx deloyment, does not mean the worker instances
+ metadata_buffer:set('serviceInstanceName', 'User Service Instance Name')
+
+ require("client"):startBackendTimer("http://127.0.0.1:8080")
+ }
+
+ server {
+ listen 8080;
+
+ location /ingress {
+ default_type text/html;
+
+ rewrite_by_lua_block {
+ require("tracer"):start()
+ }
+
+ -- Target upstream service
+ proxy_pass http://127.0.0.1:8080/backend;
+
+ body_filter_by_lua_block {
+ require("tracer"):finish()
+ }
+
+ log_by_lua_block {
+ require("tracer"):prepareForReport()
+ }
+ }
+ }
+}
+```
# Set up dev env
All codes in the `lib/skywalking` require the `*_test.lua` to do the UnitTest. To run that, you need to install
@@ -30,6 +73,22 @@ This LUA tracing lib is originally designed for Nginx+LUA/OpenResty ecosystems.
If you just use this in the Ngnix, [Setup Doc](#setup-doc) should be good enough.
The following APIs are for developers or using this lib out of the Nginx case.
+## Nginx APIs
+- **startTimer**, `require("client"):startBackendTimer("http://127.0.0.1:8080")`. Start the backend timer. This timer register the metadata and report traces to the backend.
+- **start**, `require("tracer"):start()`. Begin the tracing before the upstream begin.
+- **finish**, `require("tracer"):finish()`. Finish the tracing for this HTTP request.
+- **prepareForReport**, `require("tracer"):prepareForReport()`. Prepare the finished segment for further report.
+
+## Tracing APIs at LUA level
+**TracingContext** is the entrance API for lua level tracing.
+- `TracingContext:new(serviceId, serviceInstID)`, create an active tracing context.
+- `TracingContext:newNoOP()`, create a no OP tracing context.
+- `TracingContext:drainAfterFinished()`, fetch the segment includes all finished spans.
+
+Create 2 kinds of span
+- `TracingContext:createEntrySpan(operationName, parent, contextCarrier)`
+- `TracingContext:createExitSpan(operationName, parent, peer, contextCarrier)`
+
# Download
Have no release yet.
diff --git a/examples/nginx.conf b/examples/nginx.conf
index 01d06c1..c5b8f6f 100644
--- a/examples/nginx.conf
+++ b/examples/nginx.conf
@@ -33,10 +33,12 @@ http {
# Setup an infinite loop timer to do register and trace report.
init_worker_by_lua_block {
local metadata_buffer = ngx.shared.tracing_buffer
- metadata_buffer['serviceName'] = 'Service Name'
- metadata_buffer['serviceInstanceName'] = 'Service Instance Name'
- require("client"):startTimer(metadata_buffer, "http://127.0.0.1:8080/skywalking")
+ metadata_buffer:set('serviceName', 'User Service Name')
+ -- Instance means the number of Nginx deloyment, does not mean the worker instances
+ metadata_buffer:set('serviceInstanceName', 'User Service Instance Name')
+
+ require("client"):startBackendTimer("http://127.0.0.1:8080")
}
server {
@@ -49,84 +51,17 @@ http {
default_type text/html;
rewrite_by_lua_block {
- local TC = require('tracing_context')
- local Layer = require('span_layer')
-
- local metadata_buffer = ngx.shared.tracing_buffer
- -- Mock the service instance id
- metadata_buffer['serviceId'] = 1
- metadata_buffer['serviceInstId'] = 1
-
- local tracingContext
- if metadata_buffer['serviceInstId'] ~= nil then
- tracingContext = TC:new(metadata_buffer['serviceId'], metadata_buffer['serviceInstId'])
- else
- tracingContext = TC:newNoOP()
- end
-
- -- Constant pre-defined in SkyWalking main repo
- -- 84 represents Nginx
- local nginxComponentId = 6000
-
- local contextCarrier = {}
- contextCarrier["sw6"] = ngx.req.get_headers()["sw6"]
- local entrySpan = tracingContext:createEntrySpan(ngx.var.uri, nil, contextCarrier)
- entrySpan:start(ngx.now() * 1000)
- entrySpan:setComponentId(nginxComponentId)
- entrySpan:setLayer(Layer.HTTP)
-
- entrySpan:tag('http.method', ngx.req.get_method())
- entrySpan:tag('http.params', ngx.var.scheme .. '://' .. ngx.var.host .. ngx.var.request_uri )
-
- contextCarrier = {}
- -- Use the same URI to represent incoming and forwarding requests
- -- Change it if you need.
- local upstreamUri = ngx.var.uri
- ------------------------------------------------------
- -- NOTICE, this should be changed manually
- -- This variable represents the upstream logic address
- -- Please set them as service logic name or DNS name
- ------------------------------------------------------
- local upstreamServerName = "upstream_ip:port"
- ------------------------------------------------------
- local exitSpan = tracingContext:createExitSpan(upstreamUri, entrySpan, upstreamServerName, contextCarrier)
- exitSpan:start(ngx.now() * 1000)
- exitSpan:setComponentId(nginxComponentId)
- exitSpan:setLayer(Layer.HTTP)
-
- for name, value in pairs(contextCarrier) do
- ngx.req.set_header(name, value)
- end
-
- -- Push the data in the context
- ngx.ctx.tracingContext = tracingContext
- ngx.ctx.entrySpan = entrySpan
- ngx.ctx.exitSpan = exitSpan
+ require("tracer"):startBackendTimer()
}
proxy_pass http://127.0.0.1:8080/backend;
body_filter_by_lua_block {
- -- Finish the exit span when received the first response package from upstream
- if ngx.ctx.exitSpan ~= nil then
- ngx.ctx.exitSpan:finish(ngx.now() * 1000)
- ngx.ctx.exitSpan = nil
- end
+ require("tracer"):finish()
}
log_by_lua_block {
- if ngx.ctx.entrySpan ~= nil then
- ngx.ctx.entrySpan:finish(ngx.now() * 1000)
- local status, segment = ngx.ctx.tracingContext:drainAfterFinished()
- if status then
- local segmentJson = require('cjson').encode(segment:transform())
- ngx.log(ngx.DEBUG, 'segment = ' .. segmentJson)
-
- local queue = ngx.shared.tracing_buffer
- local length = queue:lpush('segment', segmentJson)
- ngx.log(ngx.DEBUG, 'segment buffer size = ' .. queue:llen('segment'))
- end
- end
+ require("tracer"):prepareForReport()
}
}
@@ -176,5 +111,25 @@ http {
ngx.say(cjson.encode(registeredInfo))
}
}
+
+ location /v2/instance/heartbeat {
+ default_type text/html;
+ lua_need_request_body on;
+
+ content_by_lua_block {
+ local cjson = require('cjson')
+ --ngx.log(ngx.DEBUG, 'Service instance ping request = ', ngx.req.get_body_data())
+ }
+ }
+
+ location /v2/segments {
+ default_type text/html;
+ lua_need_request_body on;
+
+ content_by_lua_block {
+ local cjson = require('cjson')
+ ngx.log(ngx.DEBUG, 'Received segment = ', ngx.req.get_body_data())
+ }
+ }
}
}
\ No newline at end of file
diff --git a/lib/skywalking/client.lua b/lib/skywalking/client.lua
index 6a663fc..ebe0b11 100644
--- a/lib/skywalking/client.lua
+++ b/lib/skywalking/client.lua
@@ -19,7 +19,9 @@ local Client = {}
-- Tracing timer does the service and instance register
-- After register successfully, it sends traces and heart beat
-function Client:startTimer(metadata_buffer, backend_http_uri)
+function Client:startBackendTimer(backend_http_uri)
+ local metadata_buffer = ngx.shared.tracing_buffer
+
-- The codes of timer setup is following the OpenResty timer doc
local delay = 3 -- in seconds
local new_timer = ngx.timer.at
@@ -30,19 +32,25 @@ function Client:startTimer(metadata_buffer, backend_http_uri)
check = function(premature)
if not premature then
- if metadata_buffer['serviceId'] == nil then
+ local serviceId = metadata_buffer:get('serviceId')
+ if serviceId == nil then
self:registerService(metadata_buffer, backend_http_uri)
end
-- Register is in the async way, if register successfully, go for instance register
- if metadata_buffer['serviceId'] ~= nil then
- if metadata_buffer['serviceInstId'] == nil then
+ serviceId = metadata_buffer:get('serviceId')
+ if serviceId ~= nil then
+ local serviceInstId = metadata_buffer:get('serviceInstId')
+ if serviceInstId == nil then
self:registerServiceInstance(metadata_buffer, backend_http_uri)
end
end
-- After all register successfully, begin to send trace segments
- if metadata_buffer['serviceInstId'] ~= nil then
+ local serviceInstId = metadata_buffer:get('serviceInstId')
+ if serviceInstId ~= nil then
+ self:reportTraces(metadata_buffer, backend_http_uri)
+ self:ping(metadata_buffer, backend_http_uri)
end
-- do the health check
@@ -67,8 +75,9 @@ end
function Client:registerService(metadata_buffer, backend_http_uri)
local log = ngx.log
local DEBUG = ngx.DEBUG
+ local ERR = ngx.ERR
- local serviceName = metadata_buffer['serviceName']
+ local serviceName = metadata_buffer:get('serviceName')
local cjson = require('cjson')
local serviceRegister = require("register"):newServiceRegister(serviceName)
@@ -84,7 +93,9 @@ function Client:registerService(metadata_buffer, backend_http_uri)
},
})
- if #res.body > 0 then
+ if not res then
+ log(ERR, "Service register fails, " .. err)
+ elseif res.status == 200 then
log(DEBUG, "Service register response = " .. res.body)
local registerResults = cjson.decode(res.body)
@@ -93,9 +104,11 @@ function Client:registerService(metadata_buffer, backend_http_uri)
if result.key == serviceName then
local serviceId = result.value
log(DEBUG, "Service registered, service id = " .. serviceId)
- metadata_buffer['serviceId'] = serviceId
+ metadata_buffer:set('serviceId', serviceId)
end
end
+ else
+ log(ERR, "Service register fails, response code " .. res.status)
end
end
@@ -103,12 +116,14 @@ end
function Client:registerServiceInstance(metadata_buffer, backend_http_uri)
local log = ngx.log
local DEBUG = ngx.DEBUG
+ local ERR = ngx.ERR
- local serviceInstName = 'name:' .. metadata_buffer['serviceInstanceName']
+ local serviceInstName = 'name:' .. metadata_buffer:get('serviceInstanceName')
+ metadata_buffer:set('serviceInstanceUUID', serviceInstName)
local cjson = require('cjson')
local serviceInstanceRegister = require("register"):newServiceInstanceRegister(
- metadata_buffer['serviceId'],
+ metadata_buffer:get('serviceId'),
serviceInstName,
ngx.now() * 1000)
local serviceInstanceRegisterParam = cjson.encode(serviceInstanceRegister)
@@ -123,18 +138,100 @@ function Client:registerServiceInstance(metadata_buffer, backend_http_uri)
},
})
- if #res.body > 0 then
- log(DEBUG, "Service Instance register response = " .. res.body)
- local registerResults = cjson.decode(res.body)
+ if err == nil then
+ if res.status == 200 then
+ log(DEBUG, "Service Instance register response = " .. res.body)
+ local registerResults = cjson.decode(res.body)
+
+ for i, result in ipairs(registerResults)
+ do
+ if result.key == serviceInstName then
+ local serviceId = result.value
+ log(DEBUG, "Service Instance registered, service instance id = " .. serviceId)
+ metadata_buffer:set('serviceInstId', serviceId)
+ end
+ end
+ else
+ log(ERR, "Service Instance register fails, response code " .. res.status)
+ end
+ else
+ log(ERR, "Service Instance register fails, " .. err)
+ end
+end
- for i, result in ipairs(registerResults)
- do
- if result.key == serviceInstName then
- local serviceId = result.value
- log(DEBUG, "Service Instance registered, service instance id = " .. serviceId)
- metadata_buffer['serviceInstId'] = serviceId
+-- Ping the backend to update instance heartheat
+function Client:ping(metadata_buffer, backend_http_uri)
+ local log = ngx.log
+ local DEBUG = ngx.DEBUG
+ local ERR = ngx.ERR
+
+ local cjson = require('cjson')
+ local pingPkg = require("register"):newServiceInstancePingPkg(
+ metadata_buffer:get('serviceInstId'),
+ metadata_buffer:get('serviceInstanceUUID'),
+ ngx.now() * 1000)
+ local pingPkgParam = cjson.encode(pingPkg)
+
+ local http = require('resty.http')
+ local httpc = http.new()
+ local res, err = httpc:request_uri(backend_http_uri .. '/v2/instance/heartbeat', {
+ method = "POST",
+ body = pingPkgParam,
+ headers = {
+ ["Content-Type"] = "application/json",
+ },
+ })
+
+ if err == nil then
+ if res.status ~= 200 then
+ log(ERR, "Agent ping fails, response code " .. res.status)
+ end
+ else
+ log(ERR, "Agent ping fails, " .. err)
+ end
+end
+
+-- Report trace segments to the backend
+function Client:reportTraces(metadata_buffer, backend_http_uri)
+ local log = ngx.log
+ local DEBUG = ngx.DEBUG
+ local ERR = ngx.ERR
+
+ local queue = ngx.shared.tracing_buffer
+ local segment = queue:rpop('segment')
+
+ local count = 0;
+
+ local http = require('resty.http')
+ local httpc = http.new()
+
+ while segment ~= nil
+ do
+ local res, err = httpc:request_uri(backend_http_uri .. '/v2/segments', {
+ method = "POST",
+ body = segment,
+ headers = {
+ ["Content-Type"] = "application/json",
+ },
+ })
+
+ if err == nil then
+ if res.status ~= 200 then
+ log(ERR, "Segment report fails, response code " .. res.status)
+ break
+ else
+ count = count + 1
end
+ else
+ log(ERR, "Segment report fails, " .. err)
+ break
end
+
+ segment = queue:rpop('segment')
+ end
+
+ if count > 0 then
+ log(ERR, count, " segments reported.")
end
end
diff --git a/lib/skywalking/register.lua b/lib/skywalking/register.lua
index b6645bf..988552f 100644
--- a/lib/skywalking/register.lua
+++ b/lib/skywalking/register.lua
@@ -53,4 +53,14 @@ function Register:newServiceInstanceRegister(registeredServiceId, serviceInstUUI
return serviceInstances
end
+function Register:newServiceInstancePingPkg(registeredServiceInstId, serviceInstUUID, updateTime)
+ local serviceInstancePingPkg = {
+ serviceInstanceId = registeredServiceInstId,
+ time = updateTime,
+ serviceInstanceUUID = serviceInstUUID,
+ }
+
+ return serviceInstancePingPkg
+end
+
return Register
\ No newline at end of file
diff --git a/lib/skywalking/tracer.lua b/lib/skywalking/tracer.lua
index 2d8c3a8..7eceaff 100644
--- a/lib/skywalking/tracer.lua
+++ b/lib/skywalking/tracer.lua
@@ -17,13 +17,84 @@
local Tracer = {}
-function Tracer.new()
+function Tracer:startBackendTimer()
+ local metadata_buffer = ngx.shared.tracing_buffer
+ local TC = require('tracing_context')
+ local Layer = require('span_layer')
+
+ local tracingContext
+ local serviceName = metadata_buffer:get("serviceName")
+ local serviceInstId = metadata_buffer:get("serviceInstId")
+ local serviceId = metadata_buffer:get('serviceId')
+ if serviceInstId ~= nil then
+ tracingContext = TC:new(serviceId, serviceInstId)
+ else
+ tracingContext = TC:newNoOP()
+ end
+
+ -- Constant pre-defined in SkyWalking main repo
+ -- 84 represents Nginx
+ local nginxComponentId = 6000
+
+ local contextCarrier = {}
+ contextCarrier["sw6"] = ngx.req.get_headers()["sw6"]
+ local entrySpan = tracingContext:createEntrySpan(ngx.var.uri, nil, contextCarrier)
+ entrySpan:start(ngx.now() * 1000)
+ entrySpan:setComponentId(nginxComponentId)
+ entrySpan:setLayer(Layer.HTTP)
+
+ entrySpan:tag('http.method', ngx.req.get_method())
+ entrySpan:tag('http.params', ngx.var.scheme .. '://' .. ngx.var.host .. ngx.var.request_uri )
+
+ contextCarrier = {}
+ -- Use the same URI to represent incoming and forwarding requests
+ -- Change it if you need.
+ local upstreamUri = ngx.var.uri
+ ------------------------------------------------------
+ -- NOTICE, this should be changed manually
+ -- This variable represents the upstream logic address
+ -- Please set them as service logic name or DNS name
+ --
+ -- TODO, currently, we can't have the upstream real network address
+ ------------------------------------------------------
+ local upstreamServerName = serviceName .. "-nginx:upstream_ip:port"
+ ------------------------------------------------------
+ local exitSpan = tracingContext:createExitSpan(upstreamUri, entrySpan, upstreamServerName, contextCarrier)
+ exitSpan:start(ngx.now() * 1000)
+ exitSpan:setComponentId(nginxComponentId)
+ exitSpan:setLayer(Layer.HTTP)
+ for name, value in pairs(contextCarrier) do
+ ngx.req.set_header(name, value)
+ end
+
+ -- Push the data in the context
+ ngx.ctx.tracingContext = tracingContext
+ ngx.ctx.entrySpan = entrySpan
+ ngx.ctx.exitSpan = exitSpan
+end
+
+function Tracer:finish()
+ -- Finish the exit span when received the first response package from upstream
+ if ngx.ctx.exitSpan ~= nil then
+ ngx.ctx.exitSpan:finish(ngx.now() * 1000)
+ ngx.ctx.exitSpan = nil
+ end
end
+function Tracer:prepareForReport()
+ if ngx.ctx.entrySpan ~= nil then
+ ngx.ctx.entrySpan:finish(ngx.now() * 1000)
+ local status, segment = ngx.ctx.tracingContext:drainAfterFinished()
+ if status then
+ local segmentJson = require('cjson').encode(segment:transform())
+ ngx.log(ngx.DEBUG, 'segment = ' .. segmentJson)
-function Tracer.createEntrySpan(context, operationName, sw6_header_value)
-
+ local queue = ngx.shared.tracing_buffer
+ local length = queue:lpush('segment', segmentJson)
+ ngx.log(ngx.DEBUG, 'segment buffer size = ' .. queue:llen('segment'))
+ end
+ end
end
return Tracer
\ No newline at end of file