You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2022/01/25 19:38:11 UTC

[GitHub] [apisix] bisakhmondal opened a new pull request #6203: feat(batchprocessor): Support partial consumption of entries

bisakhmondal opened a new pull request #6203:
URL: https://github.com/apache/apisix/pull/6203


   Signed-off-by: Bisakh Mondal <bi...@gmail.com>
   
   
   
   ### What this PR does / why we need it:
   <!--- Why is this change required? What problem does it solve? -->
   <!--- If it fixes an open issue, please link to the issue here. -->
   
   Helpful where the batch processor entries can't be processed in one go (transactional) and an error occurs during the middle of the consumption. With this PR, the batch processor takes a hint from the consumer so that the already consumed entries don't get retried in the next run.
   
   Use case: https://github.com/apache/apisix/pull/6113#discussion_r790424879, datadog plugin etc (where entries can't be processed as a bulk)
   
   ### Pre-submission checklist:
   
   <!--
   Please follow the PR manners:
   1. Use Draft if the PR is not ready to be reviewed
   2. Test is required for the feat/fix PR, unless you have a good reason
   3. Doc is required for the feat PR
   4. Use a new commit to resolve review instead of `push -f`
   5. If you need to resolve merge conflicts after the PR is reviewed, please merge master but do not rebase
   6. Use "request review" to notify the reviewer once you have resolved the review
   7. Only reviewer can click "Resolve conversation" to mark the reviewer's review resolved
   -->
   
   * [x] Did you explain what problem does this PR solve? Or what new features have been added?
   * [x] Have you added corresponding test cases?
   * [ ] Have you modified the corresponding document?
   * [x] Is this PR backward compatible? **If it is not backward compatible, please discuss on the [mailing list](https://github.com/apache/apisix/tree/master#community) first**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander merged pull request #6203: feat(batchprocessor): support partial consumption of batch entries

Posted by GitBox <gi...@apache.org>.
spacewander merged pull request #6203:
URL: https://github.com/apache/apisix/pull/6203


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] bisakhmondal commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

Posted by GitBox <gi...@apache.org>.
bisakhmondal commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r792500662



##########
File path: apisix/utils/batch-processor.lua
##########
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    for i = n or 1, #batch, 1 do
+        slice[#slice+1] = batch[i]
+    end
+    return slice
+end
+
+
 function execute_func(premature, self, batch)
     if premature then
         return
     end
 
-    local ok, err = self.func(batch.entries, self.batch_max_size)
+    --- In case of "err" and a valid "n" batch processor considers, all n-1 entries have been
+    --- successfully consumed and hence reschedule the job for entries with index n to #entries
+    --- based on the current retry policy.
+    local ok, err, n = self.func(batch.entries, self.batch_max_size)
     if not ok then
-        core.log.error("Batch Processor[", self.name,
-                       "] failed to process entries: ", err)
+        if n then
+            core.log.error("Batch Processor[", self.name, "] failed to process entries [",
+                            #batch.entries + 1 - (n or -1), "/", #batch.entries ,"]: ", err)

Review comment:
       For it not to be nil, we have to update all existing instances of consumers. also, it's kind of enforcing the future implementation to return something like `false, err, 1` when the consumers consume the entries in one go and hit an error. Do we really need that sort of complication? I think it's better to leave that part to the consumers themselves like if they want to use the subtle feature or not. What do you say?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on pull request #6203: feat(batchprocessor): support partial consumption of batch entries

Posted by GitBox <gi...@apache.org>.
spacewander commented on pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#issuecomment-1021995377


   Don't forget to mention it in the doc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r792425692



##########
File path: apisix/utils/batch-processor.lua
##########
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    for i = n or 1, #batch, 1 do
+        slice[#slice+1] = batch[i]

Review comment:
       We can use offset to avoid counting len?

##########
File path: apisix/utils/batch-processor.lua
##########
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    for i = n or 1, #batch, 1 do
+        slice[#slice+1] = batch[i]
+    end
+    return slice
+end
+
+
 function execute_func(premature, self, batch)
     if premature then
         return
     end
 
-    local ok, err = self.func(batch.entries, self.batch_max_size)
+    --- In case of "err" and a valid "n" batch processor considers, all n-1 entries have been
+    --- successfully consumed and hence reschedule the job for entries with index n to #entries
+    --- based on the current retry policy.
+    local ok, err, n = self.func(batch.entries, self.batch_max_size)
     if not ok then
-        core.log.error("Batch Processor[", self.name,
-                       "] failed to process entries: ", err)
+        if n then
+            core.log.error("Batch Processor[", self.name, "] failed to process entries [",
+                            #batch.entries + 1 - (n or -1), "/", #batch.entries ,"]: ", err)

Review comment:
       The `n` can't be `nil`, so `n or -1` is unnecessary.

##########
File path: apisix/utils/batch-processor.lua
##########
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    for i = n or 1, #batch, 1 do
+        slice[#slice+1] = batch[i]
+    end
+    return slice
+end
+
+
 function execute_func(premature, self, batch)
     if premature then
         return
     end
 
-    local ok, err = self.func(batch.entries, self.batch_max_size)
+    --- In case of "err" and a valid "n" batch processor considers, all n-1 entries have been

Review comment:
       Would be `first_fail` better than `n` as the var name?

##########
File path: apisix/plugins/datadog.lua
##########
@@ -108,127 +109,144 @@ local function generate_tag(entry, const_tags)
 end
 
 
-function _M.log(conf, ctx)
-    local entry = fetch_log(ngx, {})
-    entry.balancer_ip = ctx.balancer_ip or ""
-    entry.scheme = ctx.upstream_scheme or ""
+local function send_metric_over_udp(entry, metadata)
+    local err_msg
+    local sock = udp()
+    local host, port = metadata.value.host, metadata.value.port
 
-    -- if prefer_name is set, fetch the service/route name. If the name is nil, fall back to id.
-    if conf.prefer_name then
-        if entry.service_id and entry.service_id ~= "" then
-            local svc = service_fetch(entry.service_id)
+    local ok, err = sock:setpeername(host, port)
+    if not ok then
+        return false, "failed to connect to UDP server: host[" .. host
+                    .. "] port[" .. tostring(port) .. "] err: " .. err
+    end
 
-            if svc and svc.value.name ~= "" then
-                entry.service_id =  svc.value.name
-            end
-        end
+    -- Generate prefix & suffix according dogstatsd udp data format.
+    local suffix = generate_tag(entry, metadata.value.constant_tags)
+    local prefix = metadata.value.namespace
+    if prefix ~= "" then
+        prefix = prefix .. "."
+    end
 
-        if ctx.route_name and ctx.route_name ~= "" then
-            entry.route_id = ctx.route_name
-        end
+    -- request counter
+    ok, err = sock:send(format("%s:%s|%s%s", prefix ..
+                                    "request.counter", 1, "c", suffix))

Review comment:
       Let's indent the line to `(`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] spacewander commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

Posted by GitBox <gi...@apache.org>.
spacewander commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r793197231



##########
File path: apisix/plugins/datadog.lua
##########
@@ -108,127 +109,144 @@ local function generate_tag(entry, const_tags)
 end
 
 
-function _M.log(conf, ctx)
-    local entry = fetch_log(ngx, {})
-    entry.balancer_ip = ctx.balancer_ip or ""
-    entry.scheme = ctx.upstream_scheme or ""
+local function send_metric_over_udp(entry, metadata)
+    local err_msg
+    local sock = udp()
+    local host, port = metadata.value.host, metadata.value.port
 
-    -- if prefer_name is set, fetch the service/route name. If the name is nil, fall back to id.
-    if conf.prefer_name then
-        if entry.service_id and entry.service_id ~= "" then
-            local svc = service_fetch(entry.service_id)
+    local ok, err = sock:setpeername(host, port)
+    if not ok then
+        return false, "failed to connect to UDP server: host[" .. host
+                   .. "] port[" .. tostring(port) .. "] err: " .. err
+    end
 
-            if svc and svc.value.name ~= "" then
-                entry.service_id =  svc.value.name
-            end
-        end
+    -- Generate prefix & suffix according dogstatsd udp data format.
+    local suffix = generate_tag(entry, metadata.value.constant_tags)
+    local prefix = metadata.value.namespace
+    if prefix ~= "" then
+        prefix = prefix .. "."
+    end
 
-        if ctx.route_name and ctx.route_name ~= "" then
-            entry.route_id = ctx.route_name
-        end
+    -- request counter
+    ok, err = sock:send(format("%s:%s|%s%s", prefix ..
+                       "request.counter", 1, "c", suffix))

Review comment:
       My bad! Actually, I mean the alignment like:
   https://github.com/apache/apisix/blob/615ee41312282ca2375abddae1d074637a901d89/apisix/upstream.lua#L274-L275




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] bisakhmondal commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

Posted by GitBox <gi...@apache.org>.
bisakhmondal commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r793275311



##########
File path: apisix/plugins/datadog.lua
##########
@@ -108,127 +109,144 @@ local function generate_tag(entry, const_tags)
 end
 
 
-function _M.log(conf, ctx)
-    local entry = fetch_log(ngx, {})
-    entry.balancer_ip = ctx.balancer_ip or ""
-    entry.scheme = ctx.upstream_scheme or ""
+local function send_metric_over_udp(entry, metadata)
+    local err_msg
+    local sock = udp()
+    local host, port = metadata.value.host, metadata.value.port
 
-    -- if prefer_name is set, fetch the service/route name. If the name is nil, fall back to id.
-    if conf.prefer_name then
-        if entry.service_id and entry.service_id ~= "" then
-            local svc = service_fetch(entry.service_id)
+    local ok, err = sock:setpeername(host, port)
+    if not ok then
+        return false, "failed to connect to UDP server: host[" .. host
+                   .. "] port[" .. tostring(port) .. "] err: " .. err
+    end
 
-            if svc and svc.value.name ~= "" then
-                entry.service_id =  svc.value.name
-            end
-        end
+    -- Generate prefix & suffix according dogstatsd udp data format.
+    local suffix = generate_tag(entry, metadata.value.constant_tags)
+    local prefix = metadata.value.namespace
+    if prefix ~= "" then
+        prefix = prefix .. "."
+    end
 
-        if ctx.route_name and ctx.route_name ~= "" then
-            entry.route_id = ctx.route_name
-        end
+    -- request counter
+    ok, err = sock:send(format("%s:%s|%s%s", prefix ..
+                       "request.counter", 1, "c", suffix))

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] bisakhmondal commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

Posted by GitBox <gi...@apache.org>.
bisakhmondal commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r792501257



##########
File path: apisix/utils/batch-processor.lua
##########
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    for i = n or 1, #batch, 1 do
+        slice[#slice+1] = batch[i]
+    end
+    return slice
+end
+
+
 function execute_func(premature, self, batch)
     if premature then
         return
     end
 
-    local ok, err = self.func(batch.entries, self.batch_max_size)
+    --- In case of "err" and a valid "n" batch processor considers, all n-1 entries have been
+    --- successfully consumed and hence reschedule the job for entries with index n to #entries
+    --- based on the current retry policy.
+    local ok, err, n = self.func(batch.entries, self.batch_max_size)
     if not ok then
-        core.log.error("Batch Processor[", self.name,
-                       "] failed to process entries: ", err)
+        if n then
+            core.log.error("Batch Processor[", self.name, "] failed to process entries [",
+                            #batch.entries + 1 - (n or -1), "/", #batch.entries ,"]: ", err)

Review comment:
       Oh damn, got your point. It's the if block, haha




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [apisix] bisakhmondal commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

Posted by GitBox <gi...@apache.org>.
bisakhmondal commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r792496732



##########
File path: apisix/utils/batch-processor.lua
##########
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    for i = n or 1, #batch, 1 do
+        slice[#slice+1] = batch[i]

Review comment:
       Oh, I see. I thought len is O(1) operation but it seems, during each invocation it gets recomputed at O(logn).
   Nice nit picking :+1: 

##########
File path: apisix/utils/batch-processor.lua
##########
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    for i = n or 1, #batch, 1 do
+        slice[#slice+1] = batch[i]
+    end
+    return slice
+end
+
+
 function execute_func(premature, self, batch)
     if premature then
         return
     end
 
-    local ok, err = self.func(batch.entries, self.batch_max_size)
+    --- In case of "err" and a valid "n" batch processor considers, all n-1 entries have been

Review comment:
       Yes, defenitely




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org