You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2022/05/12 09:29:28 UTC

[GitHub] [apisix] spacewander commented on a diff in pull request #7028: feat: add pubsub framework

spacewander commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871150746


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback

Review Comment:
   The syntax is `@tparam type name desc`, see https://github.com/apache/apisix/blob/6ddca102503a52822407301fdffb869e32fe59e9/apisix/core/json.lua#L110



##########
.github/workflows/build.yml:
##########
@@ -31,7 +31,7 @@ jobs:
           - linux_openresty_1_17
         test_dir:
           - t/plugin
-          - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
+          - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
           - t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc

Review Comment:
   plugin < pubsub < node, so we should put it here



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return

Review Comment:
   Better to use break in the while loop and handle the error in the same place



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return
+        end
+
+        -- the pubsub messages use binary, if the message is not
+        -- binary, skip this message
+        if raw_type ~= "binary" then
+            log.warn("pubsub server receive non-binary data, type: ",
+                raw_type, ", data: ", raw_data)
+            goto continue
+        end
+
+        -- recovery of stored pb_store
+        pb.state(pb_state)
+
+        local data, err = pb.decode("PubSubReq", raw_data)
+        if not data then
+            log.error("pubsub server receives undecodable data, err: ", err)
+            send_error_resp(ws, 0, "wrong command")
+            goto continue
+        end
+
+        -- command sequence code
+        local sequence = data.sequence
+
+        -- call command handler to generate response data
+        for key, value in pairs(data) do
+            -- There are sequence and command properties in the data,
+            -- select the handler according to the command value.
+            if key ~= "sequence" then
+                local handler = self.cmd_handler[key]
+                if not handler then
+                    log.error("pubsub callback handler not registered for the",
+                        " command, command: ", key)
+                    send_error_resp(ws, sequence, "unknown command: " .. key)
+                    break
+                end
+
+                local resp, err = handler(value)
+                if not resp then
+                    send_error_resp(ws, sequence, err)

Review Comment:
   Would be better to handle err in the various `send_xxx` operation



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()

Review Comment:
   Should we always send connection outside the loop?



##########
t/pubsub/pubsub.t:
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: setup route by serverless
+--- config
+    location /t {
+        content_by_lua_block {
+            local data = {
+                {
+                    url = "/apisix/admin/routes/pubsub",
+                    data = {
+                        plugins = {
+                            ["serverless-pre-function"] = {
+                                phase = "access",
+                                functions =  {
+                                    [[return function(conf, ctx)
+                                        local core = require("apisix.core");
+                                        local pubsub, err = core.pubsub.new()
+                                        if not pubsub then
+                                            core.log.error("failed to initialize pubsub module, err: ", err)
+                                            core.response.exit(400)
+                                            return
+                                        end
+                                        pubsub:on("cmd_kafka_list_offset", function (params)
+                                            return nil, "test"
+                                        end)
+                                        pubsub:wait()
+                                        ngx.exit(0)
+                                    end]],
+                                }
+                            }
+                        },
+                        uri = "/pubsub"
+                    },
+                },
+            }
+
+            local t = require("lib.test_admin").test
+
+            for _, data in ipairs(data) do
+                local code, body = t(data.url, ngx.HTTP_PUT, data.data)
+                ngx.say(code..body)

Review Comment:
   Better not directly print the 201 status code, because when we rerun the test, code 200 will return instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org