You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/25 06:52:31 UTC
[camel] branch main updated: CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed. (#10811)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 5cd3136bbf5 CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed. (#10811)
5cd3136bbf5 is described below
commit 5cd3136bbf53ba163123ba6c831b37c795c0827f
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Jul 25 08:52:24 2023 +0200
CAMEL-16837: aws2-ddbstream consumer should have batch metadata on each exchange processed. (#10811)
---
.../aws2/ddbstream/Ddb2StreamConsumer.java | 28 ++++++++++++++++------
.../aws2/ddbstream/Ddb2StreamEndpoint.java | 1 -
2 files changed, 21 insertions(+), 8 deletions(-)
diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
index 7cc4714b580..d6b0b1b8da1 100644
--- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
+++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
@@ -25,12 +25,12 @@ import java.util.Queue;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Processor;
import org.apache.camel.health.HealthCheckHelper;
import org.apache.camel.health.WritableHealthCheckRepository;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
-import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.ExpiredIteratorException;
@@ -99,16 +99,29 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer {
@Override
public int processBatch(Queue<Object> exchanges) throws Exception {
- int processedExchanges = 0;
- while (!exchanges.isEmpty()) {
- final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+ int total = exchanges.size();
+ int answer = 0;
+
+ for (int index = 0; index < total && isBatchAllowed(); index++) {
+ // only loop if we are started (allowed to run)
+ // use poll to remove the head so it does not consume memory even
+ // after we have processed it
+ Exchange exchange = (Exchange) exchanges.poll();
+ // add current index and total as properties
+ exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, index);
+ exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, total);
+ exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, index == total - 1);
+
+ // update pending number of exchanges
+ pendingExchanges = total - index - 1;
// use default consumer callback
AsyncCallback cb = defaultConsumerCallback(exchange, true);
getAsyncProcessor().process(exchange, cb);
- processedExchanges++;
+ answer++;
}
- return processedExchanges;
+
+ return answer;
}
@Override
@@ -122,7 +135,8 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer {
if (healthCheckRepository != null) {
consumerHealthCheck = new Ddb2StreamConsumerHealthCheck(this, getRouteId());
- consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckEnabled() && getEndpoint().getComponent().isHealthCheckConsumerEnabled());
+ consumerHealthCheck.setEnabled(getEndpoint().getComponent().isHealthCheckEnabled()
+ && getEndpoint().getComponent().isHealthCheckConsumerEnabled());
healthCheckRepository.addHealthCheck(consumerHealthCheck);
}
}
diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
index 261ab43c344..715234651ed 100644
--- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
+++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.aws2.ddbstream;
import java.net.URI;
import org.apache.camel.Category;
-import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;