You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@apisix.apache.org by GitBox <gi...@apache.org> on 2022/01/26 09:01:15 UTC

[GitHub] [apisix] spacewander commented on a change in pull request #6203: feat(batchprocessor): support partial consumption of batch entries

spacewander commented on a change in pull request #6203:
URL: https://github.com/apache/apisix/pull/6203#discussion_r792425692



##########
File path: apisix/utils/batch-processor.lua
##########
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    for i = n or 1, #batch, 1 do
+        slice[#slice+1] = batch[i]

Review comment:
       We can use offset to avoid counting len?

##########
File path: apisix/utils/batch-processor.lua
##########
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    for i = n or 1, #batch, 1 do
+        slice[#slice+1] = batch[i]
+    end
+    return slice
+end
+
+
 function execute_func(premature, self, batch)
     if premature then
         return
     end
 
-    local ok, err = self.func(batch.entries, self.batch_max_size)
+    --- In case of "err" and a valid "n" batch processor considers, all n-1 entries have been
+    --- successfully consumed and hence reschedule the job for entries with index n to #entries
+    --- based on the current retry policy.
+    local ok, err, n = self.func(batch.entries, self.batch_max_size)
     if not ok then
-        core.log.error("Batch Processor[", self.name,
-                       "] failed to process entries: ", err)
+        if n then
+            core.log.error("Batch Processor[", self.name, "] failed to process entries [",
+                            #batch.entries + 1 - (n or -1), "/", #batch.entries ,"]: ", err)

Review comment:
       The `n` can't be `nil`, so `n or -1` is unnecessary.

##########
File path: apisix/utils/batch-processor.lua
##########
@@ -63,17 +63,36 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    for i = n or 1, #batch, 1 do
+        slice[#slice+1] = batch[i]
+    end
+    return slice
+end
+
+
 function execute_func(premature, self, batch)
     if premature then
         return
     end
 
-    local ok, err = self.func(batch.entries, self.batch_max_size)
+    --- In case of "err" and a valid "n" batch processor considers, all n-1 entries have been

Review comment:
       Would be `first_fail` better than `n` as the var name?

##########
File path: apisix/plugins/datadog.lua
##########
@@ -108,127 +109,144 @@ local function generate_tag(entry, const_tags)
 end
 
 
-function _M.log(conf, ctx)
-    local entry = fetch_log(ngx, {})
-    entry.balancer_ip = ctx.balancer_ip or ""
-    entry.scheme = ctx.upstream_scheme or ""
+local function send_metric_over_udp(entry, metadata)
+    local err_msg
+    local sock = udp()
+    local host, port = metadata.value.host, metadata.value.port
 
-    -- if prefer_name is set, fetch the service/route name. If the name is nil, fall back to id.
-    if conf.prefer_name then
-        if entry.service_id and entry.service_id ~= "" then
-            local svc = service_fetch(entry.service_id)
+    local ok, err = sock:setpeername(host, port)
+    if not ok then
+        return false, "failed to connect to UDP server: host[" .. host
+                    .. "] port[" .. tostring(port) .. "] err: " .. err
+    end
 
-            if svc and svc.value.name ~= "" then
-                entry.service_id =  svc.value.name
-            end
-        end
+    -- Generate prefix & suffix according dogstatsd udp data format.
+    local suffix = generate_tag(entry, metadata.value.constant_tags)
+    local prefix = metadata.value.namespace
+    if prefix ~= "" then
+        prefix = prefix .. "."
+    end
 
-        if ctx.route_name and ctx.route_name ~= "" then
-            entry.route_id = ctx.route_name
-        end
+    -- request counter
+    ok, err = sock:send(format("%s:%s|%s%s", prefix ..
+                                    "request.counter", 1, "c", suffix))

Review comment:
       Let's indent the line to `(`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@apisix.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org