You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/25 06:52:31 UTC

[camel] branch main updated: CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed. (#10811)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cd3136bbf5 CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed. (#10811)
5cd3136bbf5 is described below

commit 5cd3136bbf53ba163123ba6c831b37c795c0827f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Jul 25 08:52:24 2023 +0200

    CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed. (#10811)
---
 .../aws2/ddbstream/Ddb2StreamConsumer.java         | 28 ++++++++++++++++------
 .../aws2/ddbstream/Ddb2StreamEndpoint.java         |  1 -
 2 files changed, 21 insertions(+), 8 deletions(-)

diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
index 7cc4714b580..d6b0b1b8da1 100644
--- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
+++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
@@ -25,12 +25,12 @@ import java.util.Queue;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Processor;
 import org.apache.camel.health.HealthCheckHelper;
 import org.apache.camel.health.WritableHealthCheckRepository;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
@@ -99,16 +99,29 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer {
 
     @Override
     public int processBatch(Queue<Object> exchanges) throws Exception {
-        int processedExchanges = 0;
-        while (!exchanges.isEmpty()) {
-            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+        int total = exchanges.size();
+        int answer = 0;
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            // use poll to remove the head so it does not consume memory even
+            // after we have processed it
+            Exchange exchange = (Exchange) exchanges.poll();
+            // add current index and total as properties
+            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index);
+            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total);
+            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
 
             // use default consumer callback
             AsyncCallback cb = defaultConsumerCallback(exchange, true);
             getAsyncProcessor().process(exchange, cb);
-            processedExchanges++;
+            answer++;
         }
-        return processedExchanges;
+
+        return answer;
     }
 
     @Override
@@ -122,7 +135,8 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer {
 
         if (healthCheckRepository != null) {
             consumerHealthCheck = new Ddb2StreamConsumerHealthCheck(this, getRouteId());
-            consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckEnabled() && getEndpoint().getComponent().isHealthCheckConsumerEnabled());
+            consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckEnabled()
+                    && getEndpoint().getComponent().isHealthCheckConsumerEnabled());
             healthCheckRepository.addHealthCheck(consumerHealthCheck);
         }
     }
diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
index 261ab43c344..715234651ed 100644
--- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
+++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.aws2.ddbstream;
 import java.net.URI;
 
 import org.apache.camel.Category;
-import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;