You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by sp...@apache.org on 2022/05/12 14:15:11 UTC

[apisix] branch master updated: feat: add pubsub framework (#7028)

This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4690feb42 feat: add pubsub framework (#7028)
4690feb42 is described below

commit 4690feb421779f5b79e8dd990dc00f4d3f1052d0
Author: Zeping Bai <bz...@apache.org>
AuthorDate: Thu May 12 22:15:05 2022 +0800

    feat: add pubsub framework (#7028)
---
 .github/workflows/build.yml                |   2 +-
 .github/workflows/centos7-ci.yml           |   2 +-
 Makefile                                   |   3 +
 apisix/core.lua                            |   1 +
 apisix/core/pubsub.lua                     | 228 +++++++++++++++++++++++++++++
 apisix/include/apisix/model/pubsub.proto   |  96 ++++++++++++
 docs/assets/images/pubsub-architecture.svg |   4 +
 docs/en/latest/config.json                 |   7 +
 docs/en/latest/pubsub.md                   |  59 ++++++++
 t/lib/pubsub.lua                           | 128 ++++++++++++++++
 t/pubsub/pubsub.t                          | 228 +++++++++++++++++++++++++++++
 11 files changed, 756 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 2fe71b9a1..6714e4277 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -31,7 +31,7 @@ jobs:
           - linux_openresty_1_17
         test_dir:
           - t/plugin
-          - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
+          - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
           - t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc
 
     runs-on: ${{ matrix.platform }}
diff --git a/.github/workflows/centos7-ci.yml b/.github/workflows/centos7-ci.yml
index b5ef89c07..03ae393b0 100644
--- a/.github/workflows/centos7-ci.yml
+++ b/.github/workflows/centos7-ci.yml
@@ -29,7 +29,7 @@ jobs:
       matrix:
         test_dir:
           - t/plugin
-          - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
+          - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
           - t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library
 
     steps:
diff --git a/Makefile b/Makefile
index 3bc974295..de2e7a52a 100644
--- a/Makefile
+++ b/Makefile
@@ -264,6 +264,9 @@ install: runtime
 	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix
 	$(ENV_INSTALL) apisix/*.lua $(ENV_INST_LUADIR)/apisix/
 
+	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/include/apisix/model
+	$(ENV_INSTALL) apisix/include/apisix/model/*.proto $(ENV_INST_LUADIR)/apisix/include/apisix/model/
+
 	$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/admin
 	$(ENV_INSTALL) apisix/admin/*.lua $(ENV_INST_LUADIR)/apisix/admin/
 
diff --git a/apisix/core.lua b/apisix/core.lua
index f448f9549..e03996900 100644
--- a/apisix/core.lua
+++ b/apisix/core.lua
@@ -52,4 +52,5 @@ return {
     tablepool   = require("tablepool"),
     resolver    = require("apisix.core.resolver"),
     os          = require("apisix.core.os"),
+    pubsub      = require("apisix.core.pubsub"),
 }
diff --git a/apisix/core/pubsub.lua b/apisix/core/pubsub.lua
new file mode 100644
index 000000000..14df27616
--- /dev/null
+++ b/apisix/core/pubsub.lua
@@ -0,0 +1,228 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+-- send error response to client
+local function send_error(ws, sequence, err_msg)
+    local ok, data = pcall(pb.encode, "PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    })
+    if not ok or not data then
+        log.error("failed to encode error response message, err: ", data)
+    end
+
+    local _, err = ws:send_binary(data)
+    if err then
+        log.error("failed to send response to client, err: ", err)
+    end
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function handler callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            break
+        end
+
+        -- the pubsub messages use binary, if the message is not
+        -- binary, skip this message
+        if raw_type ~= "binary" then
+            log.warn("pubsub server receive non-binary data, type: ",
+                raw_type, ", data: ", raw_data)
+            goto continue
+        end
+
+        -- recovery of stored pb_store
+        pb.state(pb_state)
+
+        local data, err = pb.decode("PubSubReq", raw_data)
+        if not data then
+            log.error("pubsub server receives undecodable data, err: ", err)
+            send_error(ws, 0, "wrong command")
+            goto continue
+        end
+
+        -- command sequence code
+        local sequence = data.sequence
+
+        -- call command handler to generate response data
+        for key, value in pairs(data) do
+            -- There are sequence and command properties in the data,
+            -- select the handler according to the command value.
+            if key ~= "sequence" then
+                local handler = self.cmd_handler[key]
+                if not handler then
+                    log.error("pubsub callback handler not registered for the",
+                        " command, command: ", key)
+                    send_error(ws, sequence, "unknown command: " .. key)
+                    break
+                end
+
+                local resp, err = handler(value)
+                if not resp then
+                    send_error(ws, sequence, err)
+                    break
+                end
+
+                -- write back the sequence
+                resp.sequence = sequence
+                local ok, data = pcall(pb.encode, "PubSubResp", resp)
+                if not ok or not data then
+                    log.error("failed to encode response message, err: ", data)
+                    break
+                end
+                local _, err = ws:send_binary(data)
+                if err then
+                    log.error("failed to send response to client, err: ", err)
+                end
+                break
+            end
+            log.warn("pubsub server receives empty command")
+        end
+
+        ::continue::
+    end
+
+    if fatal_err then
+        log.error("fatal error in pubsub websocket server, err: ", fatal_err)
+    end
+    ws:send_close()
+end
+
+
+return _M
diff --git a/apisix/include/apisix/model/pubsub.proto b/apisix/include/apisix/model/pubsub.proto
new file mode 100644
index 000000000..fdaa5e8cb
--- /dev/null
+++ b/apisix/include/apisix/model/pubsub.proto
@@ -0,0 +1,96 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+syntax = "proto3";
+
+option java_package = "org.apache.apisix.api.pubsub";
+option java_outer_classname = "PubSubProto";
+option java_multiple_files = true;
+option go_package = "github.com/apache/apisix/api/pubsub;pubsub";
+
+/**
+ * Ping command, used to keep the websocket connection alive
+ *
+ * The state field is used to pass some non-specific information,
+ * which will be returned in the pong response as is.
+ */
+message CmdPing {
+    bytes state = 1;
+}
+
+/**
+ * An empty command, a placeholder for testing purposes only
+ */
+message CmdEmpty {}
+
+/**
+ * Client request definition for pubsub scenarios
+ *
+ * The sequence field is used to associate requests and responses.
+ * Apache APISIX will set a consistent sequence for the associated
+ * requests and responses, and the client can explicitly know the
+ * response corresponding to any of the requests.
+ *
+ * The req field is the command data sent by the client, and its
+ * type will be chosen from any of the lists in the definition.
+ *
+ * Field numbers 1 to 30 in the definition are used to define basic
+ * information and future extensions, and numbers after 30 are used
+ * to define commands.
+ */
+message PubSubReq {
+    int64 sequence = 1;
+    oneof req {
+        CmdEmpty cmd_empty = 31;
+        CmdPing cmd_ping = 32;
+    };
+}
+
+/**
+ * The response body of the service when an error occurs,
+ * containing the error code and the error message.
+ */
+ message ErrorResp {
+    int32 code = 1;
+    string message = 2;
+}
+
+/**
+ * Pong response, the state field will pass through the
+ * value in the Ping command field.
+ */
+message PongResp {
+    bytes state = 1;
+}
+
+/**
+ * Server response definition for pubsub scenarios
+ *
+ * The sequence field will be the same as the value in the
+ * request, which is used to associate the associated request
+ * and response.
+ *
+ * The resp field is the response data sent by the server, and
+ * its type will be chosen from any of the lists in the definition.
+ */
+message PubSubResp {
+    int64 sequence = 1;
+    oneof resp {
+        ErrorResp error_resp = 31;
+        PongResp pong_resp = 32;
+    };
+}
diff --git a/docs/assets/images/pubsub-architecture.svg b/docs/assets/images/pubsub-architecture.svg
new file mode 100644
index 000000000..9bce53bb1
--- /dev/null
+++ b/docs/assets/images/pubsub-architecture.svg
@@ -0,0 +1,4 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Do not edit this file with editors other than diagrams.net -->
+<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="753px" height="435px" viewBox="-0.5 -0.5 753 435" content="&lt;mxfile host=&quot;app.diagrams.net&quot; modified=&quot;2022-05-08T19:55:16.347Z&quot; agent=&quot;5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36 Edg/101.0.1210.39&quot; etag=&quot;BURacUlv9Cn2ofaKhzC8&quot; version=&quot;17.5.0&quot; type=&quot;device&quot;&gt;&l [...]
\ No newline at end of file
diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json
index 6991ee3cb..60b02af34 100644
--- a/docs/en/latest/config.json
+++ b/docs/en/latest/config.json
@@ -212,6 +212,13 @@
             "discovery/kubernetes"
           ]
         },
+        {
+          "type": "category",
+          "label": "PubSub",
+          "items": [
+            "pubsub"
+          ]
+        },
         {
           "type": "category",
           "label": "xRPC",
diff --git a/docs/en/latest/pubsub.md b/docs/en/latest/pubsub.md
new file mode 100644
index 000000000..96209a486
--- /dev/null
+++ b/docs/en/latest/pubsub.md
@@ -0,0 +1,59 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm:
+
+- Producers send messages to specific brokers rather than directly to consumers.
+- Brokers cache messages sent by producers and then actively push them to subscribed consumers or pull them.
+
+The system architectures use this pattern to decouple or handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is handling north-south traffic from the server to the client. Combining it with a publish-subscribe system, we can achieve more robust features, such as real-time collaboration on online documents, online games, etc.
+
+## Architecture
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](../../../apisix/pubsub.proto).
+
+## How to support other messaging systems
+
+Apache APISIX implement an extensible pubsub module, which is responsible for starting the WebSocket server, coding and decoding communication protocols, handling client commands, and adding support for the new messaging system.
+
+### Basic Steps
+
+- Add new commands and response body definitions to `pubsub.proto`
+- Add a new option to the `scheme` configuration item in upstream
+- Add a new `scheme` judgment branch to `http_access_phase`
+- Implement the required message system instruction processing functions
+- Optional: Create plugins to support advanced configurations of this messaging system
+
+### Example
+
+TODO, an example will be added later to point out how to support other messaging systems.
diff --git a/t/lib/pubsub.lua b/t/lib/pubsub.lua
new file mode 100644
index 000000000..a2729a871
--- /dev/null
+++ b/t/lib/pubsub.lua
@@ -0,0 +1,128 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local ws_client = require "resty.websocket.client"
+local protoc    = require("protoc")
+local pb        = require("pb")
+
+local _M = {}
+local mt = { __index = _M }
+
+
+local pb_state
+local function load_proto()
+    pb.state(nil)
+    protoc.reload()
+    pb.option("int64_as_string")
+    local pubsub_protoc = protoc.new()
+    pubsub_protoc:addpath("apisix/include/apisix/model")
+    local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+    if not ok then
+        ngx.log(ngx.ERR, "failed to load protocol: "..err)
+        return err
+    end
+    pb_state = pb.state(nil)
+end
+
+
+local function init_websocket_client(endpoint)
+    local ws, err = ws_client:new()
+    if not ws then
+        ngx.log(ngx.ERR, "failed to create websocket client: "..err)
+        return nil, err
+    end
+    local ok, err = ws:connect(endpoint)
+    if not ok then
+        ngx.log(ngx.ERR, "failed to connect: "..err)
+        return nil, err
+    end
+    return ws
+end
+
+
+function _M.new_ws(server)
+    local err = load_proto()
+    if err then
+        return nil, err
+    end
+    local ws, err = init_websocket_client(server)
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        type = "ws",
+        ws_client = ws,
+    }, mt)
+
+    return obj
+end
+
+
+function _M.send_recv_ws_binary(self, data, is_raw)
+    pb.state(pb_state)
+    local ws = self.ws_client
+    if not is_raw then
+        data = pb.encode("PubSubReq", data)
+    end
+    local _, err = ws:send_binary(data)
+    if err then
+        return nil, err
+    end
+    local raw_data, _, err = ws:recv_frame()
+    if not raw_data then
+        ngx.log(ngx.ERR, "failed to receive the frame: ", err)
+        return nil, err
+    end
+    local data, err = pb.decode("PubSubResp", raw_data)
+    if not data then
+        ngx.log(ngx.ERR, "failed to decode the frame: ", err)
+        return nil, err
+    end
+
+    return data
+end
+
+
+function _M.send_recv_ws_text(self, text)
+    pb.state(pb_state)
+    local ws = self.ws_client
+    local _, err = ws:send_text(text)
+    if err then
+        return nil, err
+    end
+    local raw_data, _, err = ws:recv_frame()
+    if not raw_data then
+        ngx.log(ngx.ERR, "failed to receive the frame: ", err)
+        return nil, err
+    end
+    local data, err = pb.decode("PubSubResp", raw_data)
+    if not data then
+        ngx.log(ngx.ERR, "failed to decode the frame: ", err)
+        return nil, err
+    end
+
+    return data
+end
+
+
+function _M.close_ws(self)
+    self.ws_client:send_close()
+end
+
+
+return _M
diff --git a/t/pubsub/pubsub.t b/t/pubsub/pubsub.t
new file mode 100644
index 000000000..84b80f532
--- /dev/null
+++ b/t/pubsub/pubsub.t
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: setup route by serverless
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t("/apisix/admin/routes/pubsub", ngx.HTTP_PUT, {
+                plugins = {
+                    ["serverless-pre-function"] = {
+                        phase = "access",
+                        functions =  {
+                            [[return function(conf, ctx)
+                                local core = require("apisix.core");
+                                local pubsub, err = core.pubsub.new()
+                                if not pubsub then
+                                    core.log.error("failed to initialize pubsub module, err: ", err)
+                                    core.response.exit(400)
+                                    return
+                                end
+                                pubsub:on("cmd_ping", function (params)
+                                    if params.state == "test" then
+                                        return {pong_resp = {state = "test"}}
+                                    end
+                                    return nil, "error"
+                                end)
+                                pubsub:wait()
+                                ngx.exit(0)
+                            end]],
+                        }
+                    }
+                },
+                uri = "/pubsub"
+            })
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 2: hit route (with HTTP request)
+--- request
+GET /pubsub
+--- error_code: 400
+--- error_log
+failed to initialize pubsub module, err: bad "upgrade" request header: nil
+
+
+
+=== TEST 3: connect websocket service
+--- config
+    location /t {
+        content_by_lua_block {
+            local lib_pubsub = require("lib.pubsub")
+            local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/pubsub")
+            local data = test_pubsub:send_recv_ws_binary({
+                sequence = 0,
+                cmd_ping = {
+                    state = "test"
+                },
+            })
+            if data and data.pong_resp then
+                ngx.say("ret: ", data.pong_resp.state)
+            end
+            test_pubsub:close_ws()
+        }
+    }
+--- response_body
+ret: test
+
+
+
+=== TEST 4: connect websocket service (return error)
+--- config
+    location /t {
+        content_by_lua_block {
+            local lib_pubsub = require("lib.pubsub")
+            local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/pubsub")
+            local data = test_pubsub:send_recv_ws_binary({
+                sequence = 0,
+                cmd_ping = {
+                    state = "non-test"
+                },
+            })
+            if data and data.error_resp then
+                ngx.say("ret: ", data.error_resp.message)
+            end
+            test_pubsub:close_ws()
+        }
+    }
+--- response_body
+ret: error
+
+
+
+=== TEST 5: send unregistered command
+--- config
+    location /t {
+        content_by_lua_block {
+            local lib_pubsub = require("lib.pubsub")
+            local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/pubsub")
+            local data = test_pubsub:send_recv_ws_binary({
+                sequence = 0,
+                cmd_empty = {},
+            })
+            if data and data.error_resp then
+                ngx.say(data.error_resp.message)
+            end
+            test_pubsub:close_ws()
+        }
+    }
+--- response_body
+unknown command: cmd_empty
+--- error_log
+pubsub callback handler not registered for the command, command: cmd_empty
+
+
+
+=== TEST 6: send text command (server skip command, keep connection)
+--- config
+    location /t {
+        lua_check_client_abort on;
+        content_by_lua_block {
+            ngx.on_abort(function ()
+                ngx.log(ngx.ERR, "text command is skipped, and close connection")
+                ngx.exit(444)
+            end)
+            local lib_pubsub = require("lib.pubsub")
+            local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/pubsub")
+            test_pubsub:send_recv_ws_text("test")
+            test_pubsub:close_ws()
+        }
+    }
+--- abort
+--- ignore_response
+--- error_log
+pubsub server receive non-binary data, type: text, data: test
+text command is skipped, and close connection
+fatal error in pubsub websocket server, err: failed to receive the first 2 bytes: closed
+
+
+
+=== TEST 7: send wrong command: empty (server skip command, keep connection)
+--- config
+    location /t {
+        lua_check_client_abort on;
+        content_by_lua_block {
+            ngx.on_abort(function ()
+                ngx.log(ngx.ERR, "empty command is skipped, and close connection")
+                ngx.exit(444)
+            end)
+            local lib_pubsub = require("lib.pubsub")
+            local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/pubsub")
+            test_pubsub:send_recv_ws_binary({})
+            test_pubsub:close_ws()
+        }
+    }
+--- abort
+--- ignore_response
+--- error_log
+pubsub server receives empty command
+empty command is skipped, and close connection
+fatal error in pubsub websocket server, err: failed to receive the first 2 bytes: closed
+
+
+
+=== TEST 8: send wrong command: undecodable (server skip command, keep connection)
+--- config
+    location /t {
+        lua_check_client_abort on;
+        content_by_lua_block {
+            ngx.on_abort(function ()
+                ngx.log(ngx.ERR, "empty command is skipped, and close connection")
+                ngx.exit(444)
+            end)
+            local lib_pubsub = require("lib.pubsub")
+            local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/pubsub")
+            test_pubsub:send_recv_ws_binary("!@#$%^&*中文", true)
+            test_pubsub:close_ws()
+        }
+    }
+--- abort
+--- ignore_response
+--- error_log
+pubsub server receives empty command
+empty command is skipped, and close connection
+fatal error in pubsub websocket server, err: failed to receive the first 2 bytes: closed