You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2022/05/11 10:01:55 UTC

[GitHub] [apisix] bzp2010 opened a new pull request, #7028: feat: add pubsub framework

bzp2010 opened a new pull request, #7028:
URL: https://github.com/apache/apisix/pull/7028

   ### Description
   
   Support for publish-subscribe scenarios implemented in the form of websocket + protobuf, this PR contains its basic framework implementation.
   
   Split from #6995
   
   Fixes # (issue)
   
   ### Checklist
   
   - [x] I have explained the need for this PR and the problem it solves
   - [x] I have explained the changes or the new features added to this PR
   - [x] I have added tests corresponding to this change
   - [x] I have updated the documentation to reflect this change
   - [x] I have verified that this change is backward compatible (If not, please discuss on the [APISIX mailing list](https://github.com/apache/apisix/tree/master#community) first)
   
   <!--
   
   Note
   
   1. Mark the PR as draft until it's ready to be reviewed.
   2. Always add/update tests for any changes unless you have a good reason.
   3. Always update the documentation to reflect the changes made in the PR.
   4. Make a new commit to resolve conversations instead of `push -f`.
   5. To resolve merge conflicts, merge master instead of rebasing.
   6. Use "request review" to notify the reviewer after making changes.
   7. Only a reviewer can mark a conversation as resolved.
   
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871374707


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,211 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {

Review Comment:
   fixed, according to the practice in `grpc-transcode`, `pb.encode` does error handling via `pcall`, and logs are printed for errors in both cases



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] membphis commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
membphis commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r870949550


##########
docs/zh/latest/pubsub.md:
##########
@@ -0,0 +1,248 @@
+---
+title: 发布订阅框架
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pub-sub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 摘要
+
+发布订阅是一种消息范式,
+
+- 消息生产者不直接将消息发送给消息消费者,而是由特定的代理进行中转
+- 代理会将生产者发送的消息缓存下来,之后主动推送至订阅的消费者或由消费者拉取
+
+在系统架构中通常使用这种模式进行系统解耦,或是处理大流量场景。
+
+在 Apache APISIX 中,最常用的场景是用于处理服务器至客户端的南北向流量,如果可以结合发布订阅系统,我们可以实现更为强大的功能,例如在线文档实时协作、在线游戏等。
+
+## 架构
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+当前,Apache APISIX 支持以 WebSocket 与客户端通信,客户端可以是任何支持 WebSocket 的程序,以 Protocol Buffer 作为序列化机制,查看[协议定义](../../../apisix/pubsub.proto)。
+
+## 当前支持的消息系统
+
+- [Aapche Kafka](pubsub/kafka.md)

Review Comment:
   I think this is WIP, all right? pls update your doc



##########
t/pubsub/pubsub.t:
##########
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: setup route by serverless
+--- config
+    location /t {
+        content_by_lua_block {
+            local data = {
+                {
+                    url = "/apisix/admin/routes/pubsub",
+                    data = {
+                        plugins = {
+                            ["serverless-pre-function"] = {
+                                phase = "access",
+                                functions =  {
+                                    [[return function(conf, ctx)
+                                        local core = require("apisix.core");
+                                        local pubsub, err = core.pubsub.new()
+                                        if not pubsub then
+                                            core.log.error("failed to initialize pub-sub module, err: ", err)
+                                            core.response.exit(400)
+                                            return
+                                        end
+                                        pubsub:on("cmd_kafka_list_offset", function (params)
+                                            return nil, "test"
+                                        end)
+                                        pubsub:wait()
+                                        ngx.exit(0)
+                                    end]],
+                                }
+                            }
+                        },
+                        uri = "/pubsub"
+                    },
+                },
+            }
+
+            local t = require("lib.test_admin").test
+
+            for _, data in ipairs(data) do
+                local code, body = t(data.url, ngx.HTTP_PUT, data.data)
+                ngx.say(code..body)
+            end
+        }
+    }
+--- response_body
+201passed
+
+
+
+=== TEST 2: hit route (with HTTP request)
+--- request
+GET /pubsub
+--- error_code: 400
+--- error_log
+failed to initialize pub-sub module, err: bad "upgrade" request header: nil
+
+
+
+=== TEST 3: connect websocket service
+--- config
+    location /t {
+        content_by_lua_block {
+            local lib_pubsub = require("lib.pubsub")
+            local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/pubsub")
+            local data = test_pubsub:send_recv_ws({
+                sequence = 0,
+                cmd_kafka_list_offset = {
+                    topic = "test",
+                    partition = 0,
+                    timestamp = -2,
+                },
+            })
+            if data and data.error_resp then
+                ngx.say("ret: ", data.error_resp.message)
+            end
+            test_pubsub:close_ws()
+        }
+    }
+--- response_body
+ret: test

Review Comment:
   do we need more test cases to confirm the  basic `pub/sub` is working fine



##########
docs/zh/latest/pubsub.md:
##########
@@ -0,0 +1,248 @@
+---
+title: 发布订阅框架
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pub-sub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 摘要
+
+发布订阅是一种消息范式,
+
+- 消息生产者不直接将消息发送给消息消费者,而是由特定的代理进行中转
+- 代理会将生产者发送的消息缓存下来,之后主动推送至订阅的消费者或由消费者拉取
+
+在系统架构中通常使用这种模式进行系统解耦,或是处理大流量场景。
+
+在 Apache APISIX 中,最常用的场景是用于处理服务器至客户端的南北向流量,如果可以结合发布订阅系统,我们可以实现更为强大的功能,例如在线文档实时协作、在线游戏等。
+
+## 架构
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+当前,Apache APISIX 支持以 WebSocket 与客户端通信,客户端可以是任何支持 WebSocket 的程序,以 Protocol Buffer 作为序列化机制,查看[协议定义](../../../apisix/pubsub.proto)。
+
+## 当前支持的消息系统
+
+- [Aapche Kafka](pubsub/kafka.md)
+
+## 如何支持其他消息系统
+
+Apache APISIX 中为此实现了一个可扩展的 pubsub 模块,它负责启动 WebSocket 服务器、通信协议编解码、处理客户端指令,通过它可以简单的添加新的消息系统支持。
+
+### 基本步骤
+
+- 向 `pubsub.proto` 中添加新的指令和响应体定义
+- 向上游中 `scheme` 配置项添加新的选项
+- 向 `http_access_phase` 中添加新的 `scheme` 判断分支
+- 实现所需消息系统指令处理函数
+- 可选:创建插件以支持该消息系统的高级配置
+
+### 以 Apache Kafka 为例
+
+#### 向 `pubsub.proto` 中添加新的指令和响应体定义
+
+`pubsub.proto` 中协议定义的核心为 `PubSubReq` 和 `PubSubResp` 这两个部分。
+
+首先,创建 `CmdKafkaFetch` 指令,添加所需的参数。而后,在 `PubSubReq` 中 req 的指令列表中注册这条指令,其命名为 `cmd_kafka_fetch`。
+
+```protobuf
+message CmdKafkaFetch {
+    string topic = 1;
+    int32 partition = 2;
+    int64 offset = 3;
+}
+
+message PubSubReq {
+    int64 sequence = 1;
+    oneof req {
+        CmdKafkaFetch cmd_kafka_fetch = 31;
+        // more commands
+    };
+}
+```
+
+接着创建对应的响应体 `KafkaFetchResp` 并在 `PubSubResp` 的 resp 中注册它,其命名为 `kafka_fetch_resp`。
+
+```protobuf
+message KafkaFetchResp {
+    repeated KafkaMessage messages = 1;
+}
+
+message PubSubResp {
+    int64 sequence = 1;
+    oneof resp {
+        ErrorResp error_resp = 31;
+        KafkaFetchResp kafka_fetch_resp = 32;
+        // more responses
+    };
+}
+```
+
+#### 向上游中 `scheme` 配置项添加新的选项
+
+在 `apisix/schema_def.lua` 的 `upstream` 中 `scheme` 字段枚举中添加新的选项 `kafka`。
+
+```lua
+scheme = {
+    enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp", "kafka"},
+    -- other
+}
+```
+
+#### 向 `http_access_phase` 中添加新的 `scheme` 判断分支
+
+在 `apisix/init.lua` 的 `http_access_phase` 函数中添加 `scheme` 的判断分支,以支持 `kafka` 类型的上游的处理。因为 Apache Kafka 有其自己的集群与分片方案,我们不需要使用 Apache APISIX 内置的负载均衡算法,因此在选择上游节点前拦截并接管处理流程,此处使用 `kafka_access_phase` 函数。
+
+```lua
+-- load balancer is not required by kafka upstream
+if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
+    return kafka_access_phase(api_ctx)
+end
+```
+
+#### 实现所需消息系统指令处理函数
+
+```lua
+local function kafka_access_phase(api_ctx)
+    local pubsub, err = core.pubsub.new()
+
+    -- omit kafka client initialization code here
+
+    pubsub:on("cmd_kafka_list_offset", function (params)
+        -- call kafka client to get data
+    end)
+
+    pubsub:wait()
+end
+```
+
+首先,创建 `pubsub` 模块实例,它在 `core` 包中提供。
+
+```lua
+local pubsub, err = core.pubsub.new()
+```
+
+创建需要的 Apache Kafka 客户端实例,此处省略这部分代码。
+
+接着,在 `pubsub` 实例中添加在上面协议定义中注册的指令,其中将提供一个回调函数,它的提供从通信协议中解析出的参数,开发者需要在这个回调函数中调用 kafka 客户端获取数据,并作为函数返回值返回至 `pubsub` 模块。
+
+```lua
+pubsub:on("cmd_kafka_list_offset", function (params)
+end)
+```
+
+:::note 回调函数原型
+params为协议定义中的数据;第一个返回值为数据,它需要包含响应体定义中的字段,当出现错误时则返回 `nil` 值;第二个返回值为错误,当出现错误时返回错误字符串
+
+```lua
+function (params)
+    return data, err
+end
+```
+
+:::
+
+最终,进入循环等待客户端指令,当出现错误时它将返回错误并停止处理流程。
+
+```lua
+local err = pubsub:wait()
+```
+
+#### 可选:创建 `kafka-proxy` 插件以支持其鉴权配置
+
+在插件 schema 定义中添加所需的字段,而后在 `access` 处理函数中将它们写入当前请求的上下文中。

Review Comment:
   ditto



##########
docs/zh/latest/pubsub.md:
##########
@@ -0,0 +1,248 @@
+---
+title: 发布订阅框架
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pub-sub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 摘要
+
+发布订阅是一种消息范式,
+
+- 消息生产者不直接将消息发送给消息消费者,而是由特定的代理进行中转
+- 代理会将生产者发送的消息缓存下来,之后主动推送至订阅的消费者或由消费者拉取
+
+在系统架构中通常使用这种模式进行系统解耦,或是处理大流量场景。
+
+在 Apache APISIX 中,最常用的场景是用于处理服务器至客户端的南北向流量,如果可以结合发布订阅系统,我们可以实现更为强大的功能,例如在线文档实时协作、在线游戏等。
+
+## 架构
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+当前,Apache APISIX 支持以 WebSocket 与客户端通信,客户端可以是任何支持 WebSocket 的程序,以 Protocol Buffer 作为序列化机制,查看[协议定义](../../../apisix/pubsub.proto)。
+
+## 当前支持的消息系统
+
+- [Aapche Kafka](pubsub/kafka.md)

Review Comment:
   and I think you can submit the English version only, we can submit the Chinese version later
   
   small PR is easier review



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871009445


##########
docs/zh/latest/pubsub.md:
##########
@@ -0,0 +1,248 @@
+---
+title: 发布订阅框架
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pub-sub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 摘要
+
+发布订阅是一种消息范式,
+
+- 消息生产者不直接将消息发送给消息消费者,而是由特定的代理进行中转
+- 代理会将生产者发送的消息缓存下来,之后主动推送至订阅的消费者或由消费者拉取
+
+在系统架构中通常使用这种模式进行系统解耦,或是处理大流量场景。
+
+在 Apache APISIX 中,最常用的场景是用于处理服务器至客户端的南北向流量,如果可以结合发布订阅系统,我们可以实现更为强大的功能,例如在线文档实时协作、在线游戏等。
+
+## 架构
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+当前,Apache APISIX 支持以 WebSocket 与客户端通信,客户端可以是任何支持 WebSocket 的程序,以 Protocol Buffer 作为序列化机制,查看[协议定义](../../../apisix/pubsub.proto)。
+
+## 当前支持的消息系统
+
+- [Aapche Kafka](pubsub/kafka.md)
+
+## 如何支持其他消息系统
+
+Apache APISIX 中为此实现了一个可扩展的 pubsub 模块,它负责启动 WebSocket 服务器、通信协议编解码、处理客户端指令,通过它可以简单的添加新的消息系统支持。
+
+### 基本步骤
+
+- 向 `pubsub.proto` 中添加新的指令和响应体定义
+- 向上游中 `scheme` 配置项添加新的选项
+- 向 `http_access_phase` 中添加新的 `scheme` 判断分支
+- 实现所需消息系统指令处理函数
+- 可选:创建插件以支持该消息系统的高级配置
+
+### 以 Apache Kafka 为例
+
+#### 向 `pubsub.proto` 中添加新的指令和响应体定义
+
+`pubsub.proto` 中协议定义的核心为 `PubSubReq` 和 `PubSubResp` 这两个部分。
+
+首先,创建 `CmdKafkaFetch` 指令,添加所需的参数。而后,在 `PubSubReq` 中 req 的指令列表中注册这条指令,其命名为 `cmd_kafka_fetch`。
+
+```protobuf
+message CmdKafkaFetch {
+    string topic = 1;
+    int32 partition = 2;
+    int64 offset = 3;
+}
+
+message PubSubReq {
+    int64 sequence = 1;
+    oneof req {
+        CmdKafkaFetch cmd_kafka_fetch = 31;
+        // more commands
+    };
+}
+```
+
+接着创建对应的响应体 `KafkaFetchResp` 并在 `PubSubResp` 的 resp 中注册它,其命名为 `kafka_fetch_resp`。
+
+```protobuf
+message KafkaFetchResp {
+    repeated KafkaMessage messages = 1;
+}
+
+message PubSubResp {
+    int64 sequence = 1;
+    oneof resp {
+        ErrorResp error_resp = 31;
+        KafkaFetchResp kafka_fetch_resp = 32;
+        // more responses
+    };
+}
+```
+
+#### 向上游中 `scheme` 配置项添加新的选项
+
+在 `apisix/schema_def.lua` 的 `upstream` 中 `scheme` 字段枚举中添加新的选项 `kafka`。
+
+```lua
+scheme = {
+    enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp", "kafka"},
+    -- other
+}
+```
+
+#### 向 `http_access_phase` 中添加新的 `scheme` 判断分支
+
+在 `apisix/init.lua` 的 `http_access_phase` 函数中添加 `scheme` 的判断分支,以支持 `kafka` 类型的上游的处理。因为 Apache Kafka 有其自己的集群与分片方案,我们不需要使用 Apache APISIX 内置的负载均衡算法,因此在选择上游节点前拦截并接管处理流程,此处使用 `kafka_access_phase` 函数。
+
+```lua
+-- load balancer is not required by kafka upstream
+if api_ctx.matched_upstream and api_ctx.matched_upstream.scheme == "kafka" then
+    return kafka_access_phase(api_ctx)
+end
+```
+
+#### 实现所需消息系统指令处理函数
+
+```lua
+local function kafka_access_phase(api_ctx)
+    local pubsub, err = core.pubsub.new()
+
+    -- omit kafka client initialization code here
+
+    pubsub:on("cmd_kafka_list_offset", function (params)
+        -- call kafka client to get data
+    end)
+
+    pubsub:wait()
+end
+```
+
+首先,创建 `pubsub` 模块实例,它在 `core` 包中提供。
+
+```lua
+local pubsub, err = core.pubsub.new()
+```
+
+创建需要的 Apache Kafka 客户端实例,此处省略这部分代码。
+
+接着,在 `pubsub` 实例中添加在上面协议定义中注册的指令,其中将提供一个回调函数,它的提供从通信协议中解析出的参数,开发者需要在这个回调函数中调用 kafka 客户端获取数据,并作为函数返回值返回至 `pubsub` 模块。
+
+```lua
+pubsub:on("cmd_kafka_list_offset", function (params)
+end)
+```
+
+:::note 回调函数原型
+params为协议定义中的数据;第一个返回值为数据,它需要包含响应体定义中的字段,当出现错误时则返回 `nil` 值;第二个返回值为错误,当出现错误时返回错误字符串
+
+```lua
+function (params)
+    return data, err
+end
+```
+
+:::
+
+最终,进入循环等待客户端指令,当出现错误时它将返回错误并停止处理流程。
+
+```lua
+local err = pubsub:wait()
+```
+
+#### 可选:创建 `kafka-proxy` 插件以支持其鉴权配置
+
+在插件 schema 定义中添加所需的字段,而后在 `access` 处理函数中将它们写入当前请求的上下文中。

Review Comment:
   chinese removed; that's an developer example, not limited to whether or not I've provided an implementation, anyone can follow these steps to implement kafka on their own version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
spacewander commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871267030


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return

Review Comment:
   We can use break here and check fatal_err? It is smelly to use both break and return in the loop,
   This will cause error-prone control flow,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r872126400


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback

Review Comment:
   continue adjust #7043



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871296874


##########
t/pubsub/pubsub.t:
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: setup route by serverless
+--- config
+    location /t {
+        content_by_lua_block {
+            local data = {
+                {
+                    url = "/apisix/admin/routes/pubsub",
+                    data = {
+                        plugins = {
+                            ["serverless-pre-function"] = {
+                                phase = "access",
+                                functions =  {
+                                    [[return function(conf, ctx)
+                                        local core = require("apisix.core");
+                                        local pubsub, err = core.pubsub.new()
+                                        if not pubsub then
+                                            core.log.error("failed to initialize pubsub module, err: ", err)
+                                            core.response.exit(400)
+                                            return
+                                        end
+                                        pubsub:on("cmd_kafka_list_offset", function (params)
+                                            return nil, "test"
+                                        end)
+                                        pubsub:wait()
+                                        ngx.exit(0)
+                                    end]],
+                                }
+                            }
+                        },
+                        uri = "/pubsub"
+                    },
+                },
+            }
+
+            local t = require("lib.test_admin").test
+
+            for _, data in ipairs(data) do
+                local code, body = t(data.url, ngx.HTTP_PUT, data.data)
+                ngx.say(code..body)

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871017987


##########
t/pubsub/pubsub.t:
##########
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: setup route by serverless
+--- config
+    location /t {
+        content_by_lua_block {
+            local data = {
+                {
+                    url = "/apisix/admin/routes/pubsub",
+                    data = {
+                        plugins = {
+                            ["serverless-pre-function"] = {
+                                phase = "access",
+                                functions =  {
+                                    [[return function(conf, ctx)
+                                        local core = require("apisix.core");
+                                        local pubsub, err = core.pubsub.new()
+                                        if not pubsub then
+                                            core.log.error("failed to initialize pub-sub module, err: ", err)
+                                            core.response.exit(400)
+                                            return
+                                        end
+                                        pubsub:on("cmd_kafka_list_offset", function (params)
+                                            return nil, "test"
+                                        end)
+                                        pubsub:wait()
+                                        ngx.exit(0)
+                                    end]],
+                                }
+                            }
+                        },
+                        uri = "/pubsub"
+                    },
+                },
+            }
+
+            local t = require("lib.test_admin").test
+
+            for _, data in ipairs(data) do
+                local code, body = t(data.url, ngx.HTTP_PUT, data.data)
+                ngx.say(code..body)
+            end
+        }
+    }
+--- response_body
+201passed
+
+
+
+=== TEST 2: hit route (with HTTP request)
+--- request
+GET /pubsub
+--- error_code: 400
+--- error_log
+failed to initialize pub-sub module, err: bad "upgrade" request header: nil
+
+
+
+=== TEST 3: connect websocket service
+--- config
+    location /t {
+        content_by_lua_block {
+            local lib_pubsub = require("lib.pubsub")
+            local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/pubsub")
+            local data = test_pubsub:send_recv_ws({
+                sequence = 0,
+                cmd_kafka_list_offset = {
+                    topic = "test",
+                    partition = 0,
+                    timestamp = -2,
+                },
+            })
+            if data and data.error_resp then
+                ngx.say("ret: ", data.error_resp.message)
+            end
+            test_pubsub:close_ws()
+        }
+    }
+--- response_body
+ret: test

Review Comment:
   Tests have covered testable log points and possible error branches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on PR #7028:
URL: https://github.com/apache/apisix/pull/7028#issuecomment-1124109788

   ### Update
   
   The current pubsub module switches to use the pubsub module-level independent `pb_state` database, just like the `pb_state` cached in lrucache in `grpc-transcode`, which will switch to the cached `pb_state` before each decoding command.
   
   **But what I'm not sure is if in the extreme case `grpc-transcode` and pubsub switch `pb_state` concurrently at the same time will cause confusion. 🤔**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] membphis commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
membphis commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871306191


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,211 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)

Review Comment:
   we can remove `nil` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r872124654


##########
.github/workflows/build.yml:
##########
@@ -31,7 +31,7 @@ jobs:
           - linux_openresty_1_17
         test_dir:
           - t/plugin
-          - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
+          - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
           - t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc

Review Comment:
   fixed #7043 , According to https://github.com/apache/apisix/pull/6995#discussion_r869835933, I have alphabetically aligned it between `node` and `router`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871381746


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,211 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)

Review Comment:
   Referring to the approach in `grpc-transcode`, which also uses `pb.state(nil)`, according to the `lua-protobuf` documentation, `pb.state(nil)` and `pb.state()` then have different behavior.
   
   https://github.com/starwing/lua-protobuf/blob/master/README.md?plain=1#L208-L209



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,211 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871378748


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] membphis commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
membphis commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871305460


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,211 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {

Review Comment:
   we need to capture the return value, all right?
   
   `pb.encode` and `ws:send_binary`, they may fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871315021


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,211 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {

Review Comment:
   https://github.com/starwing/lua-protobuf/blob/master/README.md?plain=1#L208-L209
   
   According to the `lua-protobuf` documentation, it looks like there is a different behavior when there are no arguments and `nil` arguments. I'm not sure if no arguments and `nil` mean the same thing in lua ffi. `pb` is a dynamic link library module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871315552


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,211 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)

Review Comment:
   https://github.com/starwing/lua-protobuf/blob/master/README.md?plain=1#L208-L209
   
   According to the `lua-protobuf` documentation, it looks like there is a different behavior when there are no arguments and `nil` arguments. I'm not sure if no arguments and `nil` mean the same thing in lua ffi. `pb` is a dynamic link library module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r870915666


##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not send messages directly to message consumers, but are relayed by a specific broker that caches messages sent by producers and then actively pushes them to subscribed consumers or pulls them by consumers. This pattern is often used in system architectures for system decoupling or to handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is for handling north-south traffic from the server to the client. If we can combine it with a publish-subscribe scenario, we can achieve more powerful features, such as real-time collaboration on online documents, online games, etc.
+
+## Architecture
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](../../../apisix/pubsub.proto).
+
+## Supported messaging systems
+
+- [Aapche Kafka](pubsub/kafka.md)
+
+## How to support other messaging systems
+
+An extensible pubsub module is implemented in Apache APISIX, which is responsible for starting the WebSocket server, coding and decoding communication protocols, handling client commands, and through which new messaging system support can be simply added.
+
+### Basic Steps
+
+- Add new commands and response body definitions to `pubsub.proto`
+- Add a new option to the `scheme` configuration item in upstream
+- Add a new `scheme` judgment branch to `http_access_phase`
+- Implement the required message system instruction processing functions
+- Optional: Create plugins to support advanced configurations of this messaging system
+
+### the example of Apache Kafka
+
+#### Add new commands and response body definitions to `pubsub.proto`
+
+The core of the protocol definition in `pubsub.proto` is the two parts `PubSubReq` and `PubSubResp`.
+
+First, create the `CmdKafkaFetch` command and add the required parameters. Then, register this command in the list of commands for `req` in `PubSubReq`, which is named `cmd_kafka_fetch`.
+
+```protobuf
+message CmdKafkaFetch {
+    string topic = 1;
+    int32 partition = 2;
+    int64 offset = 3;
+}
+
+message PubSubReq {
+    int64 sequence = 1;
+    oneof req {
+        CmdKafkaFetch cmd_kafka_fetch = 31;
+        // more commands
+    };
+}
+```
+
+Then create the corresponding response body `KafkaFetchResp` and register it in the `resp` of `PubSubResp`, named `kafka_fetch_resp`.
+
+```protobuf
+message KafkaFetchResp {
+    repeated KafkaMessage messages = 1;
+}
+
+message PubSubResp {
+    int64 sequence = 1;
+    oneof resp {
+        ErrorResp error_resp = 31;
+        KafkaFetchResp kafka_fetch_resp = 32;
+        // more responses
+    };
+}
+```
+
+#### Add a new option to the `scheme` configuration item in upstream
+
+Add a new option `kafka` to the `scheme` field enumeration in the `upstream` of `apisix/schema_def.lua`.
+
+```lua
+scheme = {
+    enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp", "kafka"},
+    -- other
+}
+```
+
+#### Add a new `scheme` judgment branch to `http_access_phase`
+
+Add a `scheme` judgment branch to the `http_access_phase` function in `apisix/init.lua` to support the processing of `kafka` type upstreams. Because of Apache Kafka has its own clustering and partition scheme, we do not need to use the Apache APISIX built-in load balancing algorithm, so we intercept and take over the processing flow before selecting the upstream node, here using the `kafka_access_phase` function.

Review Comment:
   fixed



##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not send messages directly to message consumers, but are relayed by a specific broker that caches messages sent by producers and then actively pushes them to subscribed consumers or pulls them by consumers. This pattern is often used in system architectures for system decoupling or to handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is for handling north-south traffic from the server to the client. If we can combine it with a publish-subscribe scenario, we can achieve more powerful features, such as real-time collaboration on online documents, online games, etc.
+
+## Architecture
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](../../../apisix/pubsub.proto).
+
+## Supported messaging systems
+
+- [Aapche Kafka](pubsub/kafka.md)
+
+## How to support other messaging systems
+
+An extensible pubsub module is implemented in Apache APISIX, which is responsible for starting the WebSocket server, coding and decoding communication protocols, handling client commands, and through which new messaging system support can be simply added.

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] membphis commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
membphis commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871306648


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,211 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)

Review Comment:
   we can remove `nil` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
spacewander commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871151592


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return

Review Comment:
   Better to use break in the while loop and handle the common logic in the same place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871227413


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return

Review Comment:
   **This is not an error, there is no return value** (in fact the return value of wait has also been removed, which does not cause confusion in the location of the error handling code), and if the client initiates a close connection, the server will also close the connection and exit the loop directly, without logging and subsequent processing. The other cases have been logged and changed to post-processing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r872651141


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return
+        end
+
+        -- the pubsub messages use binary, if the message is not
+        -- binary, skip this message
+        if raw_type ~= "binary" then
+            log.warn("pubsub server receive non-binary data, type: ",
+                raw_type, ", data: ", raw_data)
+            goto continue
+        end
+
+        -- recovery of stored pb_store
+        pb.state(pb_state)
+
+        local data, err = pb.decode("PubSubReq", raw_data)
+        if not data then
+            log.error("pubsub server receives undecodable data, err: ", err)
+            send_error_resp(ws, 0, "wrong command")
+            goto continue
+        end
+
+        -- command sequence code
+        local sequence = data.sequence
+
+        -- call command handler to generate response data
+        for key, value in pairs(data) do
+            -- There are sequence and command properties in the data,
+            -- select the handler according to the command value.
+            if key ~= "sequence" then
+                local handler = self.cmd_handler[key]
+                if not handler then
+                    log.error("pubsub callback handler not registered for the",
+                        " command, command: ", key)
+                    send_error_resp(ws, sequence, "unknown command: " .. key)
+                    break
+                end
+
+                local resp, err = handler(value)
+                if not resp then
+                    send_error_resp(ws, sequence, err)

Review Comment:
   fixed #7043 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871008364


##########
docs/zh/latest/pubsub.md:
##########
@@ -0,0 +1,248 @@
+---
+title: 发布订阅框架
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pub-sub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## 摘要
+
+发布订阅是一种消息范式,
+
+- 消息生产者不直接将消息发送给消息消费者,而是由特定的代理进行中转
+- 代理会将生产者发送的消息缓存下来,之后主动推送至订阅的消费者或由消费者拉取
+
+在系统架构中通常使用这种模式进行系统解耦,或是处理大流量场景。
+
+在 Apache APISIX 中,最常用的场景是用于处理服务器至客户端的南北向流量,如果可以结合发布订阅系统,我们可以实现更为强大的功能,例如在线文档实时协作、在线游戏等。
+
+## 架构
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+当前,Apache APISIX 支持以 WebSocket 与客户端通信,客户端可以是任何支持 WebSocket 的程序,以 Protocol Buffer 作为序列化机制,查看[协议定义](../../../apisix/pubsub.proto)。
+
+## 当前支持的消息系统
+
+- [Aapche Kafka](pubsub/kafka.md)

Review Comment:
   removed both of them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on PR #7028:
URL: https://github.com/apache/apisix/pull/7028#issuecomment-1124891377

   Other is good for me, CI is broken.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871017987


##########
t/pubsub/pubsub.t:
##########
@@ -0,0 +1,116 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: setup route by serverless
+--- config
+    location /t {
+        content_by_lua_block {
+            local data = {
+                {
+                    url = "/apisix/admin/routes/pubsub",
+                    data = {
+                        plugins = {
+                            ["serverless-pre-function"] = {
+                                phase = "access",
+                                functions =  {
+                                    [[return function(conf, ctx)
+                                        local core = require("apisix.core");
+                                        local pubsub, err = core.pubsub.new()
+                                        if not pubsub then
+                                            core.log.error("failed to initialize pub-sub module, err: ", err)
+                                            core.response.exit(400)
+                                            return
+                                        end
+                                        pubsub:on("cmd_kafka_list_offset", function (params)
+                                            return nil, "test"
+                                        end)
+                                        pubsub:wait()
+                                        ngx.exit(0)
+                                    end]],
+                                }
+                            }
+                        },
+                        uri = "/pubsub"
+                    },
+                },
+            }
+
+            local t = require("lib.test_admin").test
+
+            for _, data in ipairs(data) do
+                local code, body = t(data.url, ngx.HTTP_PUT, data.data)
+                ngx.say(code..body)
+            end
+        }
+    }
+--- response_body
+201passed
+
+
+
+=== TEST 2: hit route (with HTTP request)
+--- request
+GET /pubsub
+--- error_code: 400
+--- error_log
+failed to initialize pub-sub module, err: bad "upgrade" request header: nil
+
+
+
+=== TEST 3: connect websocket service
+--- config
+    location /t {
+        content_by_lua_block {
+            local lib_pubsub = require("lib.pubsub")
+            local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/pubsub")
+            local data = test_pubsub:send_recv_ws({
+                sequence = 0,
+                cmd_kafka_list_offset = {
+                    topic = "test",
+                    partition = 0,
+                    timestamp = -2,
+                },
+            })
+            if data and data.error_resp then
+                ngx.say("ret: ", data.error_resp.message)
+            end
+            test_pubsub:close_ws()
+        }
+    }
+--- response_body
+ret: test

Review Comment:
   Tests have covered testable log points and possible error branches, this PR, the core code is only about 200 lines, 100 lines + protocol definition (including comments), 240 lines + documentation, the remaining is all test support helper and test cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
spacewander commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871153855


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()

Review Comment:
   Should we always close connection outside the loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871302809


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return

Review Comment:
   fixed



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on PR #7028:
URL: https://github.com/apache/apisix/pull/7028#issuecomment-1123681095

   ### Update
   
   The pubsub module core and documentation was split to the current PR, and the review comments in #6995 have been modified.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871381746


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,211 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)

Review Comment:
   Referring to the approach in grpc-transcode, which also uses pb.state(nil), according to the lua-protobuf documentation, pb.state(nil) and pb.state() then have different behavior.
   
   https://github.com/starwing/lua-protobuf/blob/master/README.md?plain=1#L208-L209



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
spacewander commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r871150746


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback

Review Comment:
   The syntax is `@tparam type name desc`, see https://github.com/apache/apisix/blob/6ddca102503a52822407301fdffb869e32fe59e9/apisix/core/json.lua#L110



##########
.github/workflows/build.yml:
##########
@@ -31,7 +31,7 @@ jobs:
           - linux_openresty_1_17
         test_dir:
           - t/plugin
-          - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc
+          - t/admin t/cli t/config-center-yaml t/control t/core t/debug t/discovery t/error_page t/misc t/pubsub
           - t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library t/xrpc

Review Comment:
   plugin < pubsub < node, so we should put it here



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return

Review Comment:
   Better to use break in the while loop and handle the error in the same place



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return
+        end
+
+        -- the pubsub messages use binary, if the message is not
+        -- binary, skip this message
+        if raw_type ~= "binary" then
+            log.warn("pubsub server receive non-binary data, type: ",
+                raw_type, ", data: ", raw_data)
+            goto continue
+        end
+
+        -- recovery of stored pb_store
+        pb.state(pb_state)
+
+        local data, err = pb.decode("PubSubReq", raw_data)
+        if not data then
+            log.error("pubsub server receives undecodable data, err: ", err)
+            send_error_resp(ws, 0, "wrong command")
+            goto continue
+        end
+
+        -- command sequence code
+        local sequence = data.sequence
+
+        -- call command handler to generate response data
+        for key, value in pairs(data) do
+            -- There are sequence and command properties in the data,
+            -- select the handler according to the command value.
+            if key ~= "sequence" then
+                local handler = self.cmd_handler[key]
+                if not handler then
+                    log.error("pubsub callback handler not registered for the",
+                        " command, command: ", key)
+                    send_error_resp(ws, sequence, "unknown command: " .. key)
+                    break
+                end
+
+                local resp, err = handler(value)
+                if not resp then
+                    send_error_resp(ws, sequence, err)

Review Comment:
   Would be better to handle err in the various `send_xxx` operation



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,210 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: " .. err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+
+local function send_error_resp(ws, sequence, err_msg)
+    ws:send_binary(pb.encode("PubSubResp", {
+        sequence = sequence,
+        error_resp = {
+            code = 0,
+            message = err_msg,
+        },
+    }))
+end
+
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()

Review Comment:
   Should we always send connection outside the loop?



##########
t/pubsub/pubsub.t:
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: setup route by serverless
+--- config
+    location /t {
+        content_by_lua_block {
+            local data = {
+                {
+                    url = "/apisix/admin/routes/pubsub",
+                    data = {
+                        plugins = {
+                            ["serverless-pre-function"] = {
+                                phase = "access",
+                                functions =  {
+                                    [[return function(conf, ctx)
+                                        local core = require("apisix.core");
+                                        local pubsub, err = core.pubsub.new()
+                                        if not pubsub then
+                                            core.log.error("failed to initialize pubsub module, err: ", err)
+                                            core.response.exit(400)
+                                            return
+                                        end
+                                        pubsub:on("cmd_kafka_list_offset", function (params)
+                                            return nil, "test"
+                                        end)
+                                        pubsub:wait()
+                                        ngx.exit(0)
+                                    end]],
+                                }
+                            }
+                        },
+                        uri = "/pubsub"
+                    },
+                },
+            }
+
+            local t = require("lib.test_admin").test
+
+            for _, data in ipairs(data) do
+                local code, body = t(data.url, ngx.HTTP_PUT, data.data)
+                ngx.say(code..body)

Review Comment:
   Better not directly print the 201 status code, because when we rerun the test, code 200 will return instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] tzssangglass commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
tzssangglass commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r870896543


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,200 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: "..err

Review Comment:
   ```suggestion
               return "failed to load pubsub protocol: "..err
   ```
   ```suggestion
               return "failed to load pubsub protocol: " .. err
   ```



##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not send messages directly to message consumers, but are relayed by a specific broker that caches messages sent by producers and then actively pushes them to subscribed consumers or pulls them by consumers. This pattern is often used in system architectures for system decoupling or to handle high traffic scenarios.

Review Comment:
   ```suggestion
   Publish-subscribe is a messaging paradigm:
   
   - Producers send messages to specific brokers rather than directly to consumers.
   - Brokers cache messages sent by producers and then actively push them to subscribed consumers or pull them. 
   
   The system architectures use this pattern to decouple or handle high traffic scenarios.
   ```



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,200 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: "..err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return
+        end
+
+        -- the pub-sub messages use binary, if the message is not
+        -- binary, skip this message
+        if raw_type ~= "binary" then
+            log.warn("pubsub server receive non-binary data, type: ",
+                raw_type, ",data: ", raw_data)
+            goto continue
+        end
+
+        -- recovery of stored pb_store
+        pb.state(pb_state)
+
+        local data, err = pb.decode("PubSubReq", raw_data)
+        if not data then
+            log.error("pubsub server receives undecodable data, err: ", err)
+            goto continue
+        end
+
+        -- command sequence code
+        local sequence = data.sequence
+
+        -- call command handler to generate response data
+        for key, value in pairs(data) do
+            -- There are sequence and command properties in the data,
+            -- select the handler according to the command value.
+            if key ~= "sequence" then
+                local handler = self.cmd_handler[key]
+                if not handler then
+                    log.error("pubsub callback handler not registered for the",
+                        " this command, command: ", key)

Review Comment:
   ```suggestion
                       log.error("pubsub callback handler not registered for the",
                           " command: ", key)
   ```
   
   is enough?



##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not send messages directly to message consumers, but are relayed by a specific broker that caches messages sent by producers and then actively pushes them to subscribed consumers or pulls them by consumers. This pattern is often used in system architectures for system decoupling or to handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is for handling north-south traffic from the server to the client. If we can combine it with a publish-subscribe scenario, we can achieve more powerful features, such as real-time collaboration on online documents, online games, etc.
+
+## Architecture
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](../../../apisix/pubsub.proto).
+
+## Supported messaging systems
+
+- [Aapche Kafka](pubsub/kafka.md)
+
+## How to support other messaging systems
+
+An extensible pubsub module is implemented in Apache APISIX, which is responsible for starting the WebSocket server, coding and decoding communication protocols, handling client commands, and through which new messaging system support can be simply added.

Review Comment:
   ```suggestion
   Apache APISIX implemented an extensible pubsub module responsible for starting the WebSocket server, coding and decoding communication protocols, handling client commands, and adding support for the new messaging system.
   ```



##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not send messages directly to message consumers, but are relayed by a specific broker that caches messages sent by producers and then actively pushes them to subscribed consumers or pulls them by consumers. This pattern is often used in system architectures for system decoupling or to handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is for handling north-south traffic from the server to the client. If we can combine it with a publish-subscribe scenario, we can achieve more powerful features, such as real-time collaboration on online documents, online games, etc.
+
+## Architecture
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+Currently, Apache APISIX supports WebSocket communication with the client, which can be any application that supports WebSocket, with Protocol Buffer as the serialization mechanism, see the [protocol definition](../../../apisix/pubsub.proto).
+
+## Supported messaging systems
+
+- [Aapche Kafka](pubsub/kafka.md)
+
+## How to support other messaging systems
+
+An extensible pubsub module is implemented in Apache APISIX, which is responsible for starting the WebSocket server, coding and decoding communication protocols, handling client commands, and through which new messaging system support can be simply added.
+
+### Basic Steps
+
+- Add new commands and response body definitions to `pubsub.proto`
+- Add a new option to the `scheme` configuration item in upstream
+- Add a new `scheme` judgment branch to `http_access_phase`
+- Implement the required message system instruction processing functions
+- Optional: Create plugins to support advanced configurations of this messaging system
+
+### the example of Apache Kafka
+
+#### Add new commands and response body definitions to `pubsub.proto`
+
+The core of the protocol definition in `pubsub.proto` is the two parts `PubSubReq` and `PubSubResp`.
+
+First, create the `CmdKafkaFetch` command and add the required parameters. Then, register this command in the list of commands for `req` in `PubSubReq`, which is named `cmd_kafka_fetch`.
+
+```protobuf
+message CmdKafkaFetch {
+    string topic = 1;
+    int32 partition = 2;
+    int64 offset = 3;
+}
+
+message PubSubReq {
+    int64 sequence = 1;
+    oneof req {
+        CmdKafkaFetch cmd_kafka_fetch = 31;
+        // more commands
+    };
+}
+```
+
+Then create the corresponding response body `KafkaFetchResp` and register it in the `resp` of `PubSubResp`, named `kafka_fetch_resp`.
+
+```protobuf
+message KafkaFetchResp {
+    repeated KafkaMessage messages = 1;
+}
+
+message PubSubResp {
+    int64 sequence = 1;
+    oneof resp {
+        ErrorResp error_resp = 31;
+        KafkaFetchResp kafka_fetch_resp = 32;
+        // more responses
+    };
+}
+```
+
+#### Add a new option to the `scheme` configuration item in upstream
+
+Add a new option `kafka` to the `scheme` field enumeration in the `upstream` of `apisix/schema_def.lua`.
+
+```lua
+scheme = {
+    enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp", "kafka"},
+    -- other
+}
+```
+
+#### Add a new `scheme` judgment branch to `http_access_phase`
+
+Add a `scheme` judgment branch to the `http_access_phase` function in `apisix/init.lua` to support the processing of `kafka` type upstreams. Because of Apache Kafka has its own clustering and partition scheme, we do not need to use the Apache APISIX built-in load balancing algorithm, so we intercept and take over the processing flow before selecting the upstream node, here using the `kafka_access_phase` function.

Review Comment:
   ```suggestion
   Add a `scheme` judgment branch to the `http_access_phase` function in `apisix/init.lua` to support the processing of `kafka` type upstreams. Because Apache Kafka has its own clustering and partition scheme, we do not need to use the Apache APISIX built-in load balancing algorithm, so we intercept and take over the processing flow before selecting the upstream node, here using the `kafka_access_phase` function.
   ```



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,200 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: "..err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return
+        end
+
+        -- the pub-sub messages use binary, if the message is not
+        -- binary, skip this message
+        if raw_type ~= "binary" then
+            log.warn("pubsub server receive non-binary data, type: ",
+                raw_type, ",data: ", raw_data)
+            goto continue
+        end
+
+        -- recovery of stored pb_store
+        pb.state(pb_state)
+
+        local data, err = pb.decode("PubSubReq", raw_data)
+        if not data then
+            log.error("pubsub server receives undecodable data, err: ", err)
+            goto continue
+        end
+
+        -- command sequence code
+        local sequence = data.sequence
+
+        -- call command handler to generate response data
+        for key, value in pairs(data) do
+            -- There are sequence and command properties in the data,
+            -- select the handler according to the command value.
+            if key ~= "sequence" then
+                local handler = self.cmd_handler[key]
+                if not handler then
+                    log.error("pubsub callback handler not registered for the",
+                        " this command, command: ", key)
+                    goto continue
+                end
+
+                local resp, err = handler(value)
+                if not resp then
+                    ws:send_binary(pb.encode("PubSubResp", {
+                        sequence = sequence,
+                        error_resp = {
+                            code = 0,
+                            message = err,
+                        },
+                    }))
+                    goto continue
+                end
+
+                -- write back the sequence
+                resp.sequence = sequence
+                ws:send_binary(pb.encode("PubSubResp", resp))
+            end
+        end
+
+        ::continue::
+    end
+
+    log.error("failed to handle pub-sub command, err: websocket server: ", fatal_err)

Review Comment:
   This log looks a bit confusing.



##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not send messages directly to message consumers, but are relayed by a specific broker that caches messages sent by producers and then actively pushes them to subscribed consumers or pulls them by consumers. This pattern is often used in system architectures for system decoupling or to handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is for handling north-south traffic from the server to the client. If we can combine it with a publish-subscribe scenario, we can achieve more powerful features, such as real-time collaboration on online documents, online games, etc.

Review Comment:
   ```suggestion
   In Apache APISIX, the most common scenario is handling north-south traffic from the server to the client. Combining it with a publish-subscribe system, we can achieve more robust features, such as real-time collaboration on online documents, online games, etc.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander commented on pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
spacewander commented on PR #7028:
URL: https://github.com/apache/apisix/pull/7028#issuecomment-1124460661

   > ### Update
   > 
   > The current pubsub module switches to use the pubsub module-level independent `pb_state` database, just like the `pb_state` cached in lrucache in `grpc-transcode`, which will switch to the cached `pb_state` before each decoding command.
   > 
   > **But what I'm not sure is if in the extreme case `grpc-transcode` and pubsub switch `pb_state` concurrently at the same time will cause confusion. 🤔**
   
   There won't be concurrent issue as pubsub only modify the global pb_state when the APISIX starts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] spacewander merged pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
spacewander merged PR #7028:
URL: https://github.com/apache/apisix/pull/7028


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r870915900


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,200 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: "..err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return
+        end
+
+        -- the pub-sub messages use binary, if the message is not
+        -- binary, skip this message
+        if raw_type ~= "binary" then
+            log.warn("pubsub server receive non-binary data, type: ",
+                raw_type, ",data: ", raw_data)
+            goto continue
+        end
+
+        -- recovery of stored pb_store
+        pb.state(pb_state)
+
+        local data, err = pb.decode("PubSubReq", raw_data)
+        if not data then
+            log.error("pubsub server receives undecodable data, err: ", err)
+            goto continue
+        end
+
+        -- command sequence code
+        local sequence = data.sequence
+
+        -- call command handler to generate response data
+        for key, value in pairs(data) do
+            -- There are sequence and command properties in the data,
+            -- select the handler according to the command value.
+            if key ~= "sequence" then
+                local handler = self.cmd_handler[key]
+                if not handler then
+                    log.error("pubsub callback handler not registered for the",
+                        " this command, command: ", key)
+                    goto continue
+                end
+
+                local resp, err = handler(value)
+                if not resp then
+                    ws:send_binary(pb.encode("PubSubResp", {
+                        sequence = sequence,
+                        error_resp = {
+                            code = 0,
+                            message = err,
+                        },
+                    }))
+                    goto continue
+                end
+
+                -- write back the sequence
+                resp.sequence = sequence
+                ws:send_binary(pb.encode("PubSubResp", resp))
+            end
+        end
+
+        ::continue::
+    end
+
+    log.error("failed to handle pub-sub command, err: websocket server: ", fatal_err)

Review Comment:
   changed to more clear



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,200 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: "..err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested command.
+-- Its first return value is the data, which needs to contain the data needed for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return
+        end
+
+        -- the pub-sub messages use binary, if the message is not
+        -- binary, skip this message
+        if raw_type ~= "binary" then
+            log.warn("pubsub server receive non-binary data, type: ",
+                raw_type, ",data: ", raw_data)
+            goto continue
+        end
+
+        -- recovery of stored pb_store
+        pb.state(pb_state)
+
+        local data, err = pb.decode("PubSubReq", raw_data)
+        if not data then
+            log.error("pubsub server receives undecodable data, err: ", err)
+            goto continue
+        end
+
+        -- command sequence code
+        local sequence = data.sequence
+
+        -- call command handler to generate response data
+        for key, value in pairs(data) do
+            -- There are sequence and command properties in the data,
+            -- select the handler according to the command value.
+            if key ~= "sequence" then
+                local handler = self.cmd_handler[key]
+                if not handler then
+                    log.error("pubsub callback handler not registered for the",
+                        " this command, command: ", key)

Review Comment:
   changed



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,200 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: "..err

Review Comment:
   changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [apisix] bzp2010 commented on a diff in pull request #7028: feat: add pubsub framework

Posted by GitBox <gi...@apache.org>.
bzp2010 commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r870915770


##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not send messages directly to message consumers, but are relayed by a specific broker that caches messages sent by producers and then actively pushes them to subscribed consumers or pulls them by consumers. This pattern is often used in system architectures for system decoupling or to handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is for handling north-south traffic from the server to the client. If we can combine it with a publish-subscribe scenario, we can achieve more powerful features, such as real-time collaboration on online documents, online games, etc.

Review Comment:
   fixed



##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not send messages directly to message consumers, but are relayed by a specific broker that caches messages sent by producers and then actively pushes them to subscribed consumers or pulls them by consumers. This pattern is often used in system architectures for system decoupling or to handle high traffic scenarios.

Review Comment:
   changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org