You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/24 18:25:45 UTC

[camel] branch batch created (now 8a5db34e166)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch batch
in repository https://gitbox.apache.org/repos/asf/camel.git


      at 8a5db34e166 CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed.

This branch includes the following new commits:

     new 8a5db34e166 CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[camel] 01/01: CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch batch
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8a5db34e166c4f337f11f52b6a3420f8518bd460
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jul 24 20:25:30 2023 +0200

    CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed.
---
 .../aws2/ddbstream/Ddb2StreamConsumer.java         | 28 ++++++++++++++++------
 .../aws2/ddbstream/Ddb2StreamEndpoint.java         |  1 -
 2 files changed, 21 insertions(+), 8 deletions(-)

diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
index 7cc4714b580..d6b0b1b8da1 100644
--- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
+++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
@@ -25,12 +25,12 @@ import java.util.Queue;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Processor;
 import org.apache.camel.health.HealthCheckHelper;
 import org.apache.camel.health.WritableHealthCheckRepository;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
@@ -99,16 +99,29 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer {
 
     @Override
     public int processBatch(Queue<Object> exchanges) throws Exception {
-        int processedExchanges = 0;
-        while (!exchanges.isEmpty()) {
-            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+        int total = exchanges.size();
+        int answer = 0;
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            // use poll to remove the head so it does not consume memory even
+            // after we have processed it
+            Exchange exchange = (Exchange) exchanges.poll();
+            // add current index and total as properties
+            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index);
+            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total);
+            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
 
             // use default consumer callback
             AsyncCallback cb = defaultConsumerCallback(exchange, true);
             getAsyncProcessor().process(exchange, cb);
-            processedExchanges++;
+            answer++;
         }
-        return processedExchanges;
+
+        return answer;
     }
 
     @Override
@@ -122,7 +135,8 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer {
 
         if (healthCheckRepository != null) {
             consumerHealthCheck = new Ddb2StreamConsumerHealthCheck(this, getRouteId());
-            consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckEnabled() && getEndpoint().getComponent().isHealthCheckConsumerEnabled());
+            consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckEnabled()
+                    && getEndpoint().getComponent().isHealthCheckConsumerEnabled());
             healthCheckRepository.addHealthCheck(consumerHealthCheck);
         }
     }
diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
index 261ab43c344..715234651ed 100644
--- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
+++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.aws2.ddbstream;
 import java.net.URI;
 
 import org.apache.camel.Category;
-import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;