You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by "orpiske (via GitHub)" <gi...@apache.org> on 2024/01/23 15:48:12 UTC

[PR] CAMEL-19241: implement support for auto-commits with batching in camel-kafka [camel]

orpiske opened a new pull request, #12879:
URL: https://github.com/apache/camel/pull/12879

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-19241: implement support for auto-commits with batching in camel-kafka [camel]

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on code in PR #12879:
URL: https://github.com/apache/camel/pull/12879#discussion_r1463725683


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
     private final Processor processor;
     private final CommitManager commitManager;
 
+    private record CommitSynchronization(CommitManager commitManager) implements Synchronization {
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            final List<?> exchanges = exchange.getMessage().getBody(List.class);
+
+            // Ensure we are actually receiving what we are asked for
+            if (exchanges == null || exchanges.isEmpty()) {
+                LOG.warn("The exchange is {}", exchanges == null ? "not of the expected type (null)" : "empty");
+                return;
+            }
+
+            LOG.debug("Calling commit on {} exchanges using {}", exchanges.size(), commitManager.getClass().getSimpleName());
+            commitManager.commit();
+        }
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed");

Review Comment:
   This may be noisy if you have a recurring error and kafka batching will keep poll the same messages as offset is not moved forward.
   
    



##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
     private final Processor processor;
     private final CommitManager commitManager;
 
+    private record CommitSynchronization(CommitManager commitManager) implements Synchronization {
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            final List<?> exchanges = exchange.getMessage().getBody(List.class);
+
+            // Ensure we are actually receiving what we are asked for
+            if (exchanges == null || exchanges.isEmpty()) {
+                LOG.warn("The exchange is {}", exchanges == null ? "not of the expected type (null)" : "empty");
+                return;
+            }
+
+            LOG.debug("Calling commit on {} exchanges using {}", exchanges.size(), commitManager.getClass().getSimpleName());
+            commitManager.commit();
+        }
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed");

Review Comment:
   This will be noisy if you have a recurring error and kafka batching will keep poll the same messages as offset is not moved forward.
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-19241: implement support for auto-commits with batching in camel-kafka [camel]

Posted by "orpiske (via GitHub)" <gi...@apache.org>.
orpiske merged PR #12879:
URL: https://github.com/apache/camel/pull/12879


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-19241: implement support for auto-commits with batching in camel-kafka [camel]

Posted by "orpiske (via GitHub)" <gi...@apache.org>.
orpiske commented on code in PR #12879:
URL: https://github.com/apache/camel/pull/12879#discussion_r1464809515


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
     private final Processor processor;
     private final CommitManager commitManager;
 
+    private record CommitSynchronization(CommitManager commitManager) implements Synchronization {
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            final List<?> exchanges = exchange.getMessage().getBody(List.class);
+
+            // Ensure we are actually receiving what we are asked for
+            if (exchanges == null || exchanges.isEmpty()) {
+                LOG.warn("The exchange is {}", exchanges == null ? "not of the expected type (null)" : "empty");
+                return;
+            }
+
+            LOG.debug("Calling commit on {} exchanges using {}", exchanges.size(), commitManager.getClass().getSimpleName());
+            commitManager.commit();
+        }
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed");

Review Comment:
   BTW, based on your suggestion ... I think I should also adjust the code below, where there was the error handling used in streaming mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-19241: implement support for auto-commits with batching in camel-kafka [camel]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #12879:
URL: https://github.com/apache/camel/pull/12879#issuecomment-1906353511

   :star2: Thank you for your contribution to the Apache Camel project! :star2: 
   
   :robot: CI automation will test this PR automatically.
   
   :camel: Apache Camel Committers, please review the following items:
   
   * First-time contributors **require MANUAL approval** for the GitHub Actions to run
   
   * You can use the command `/component-test (camel-)component-name1 (camel-)component-name2..` to request a test from the test bot.
   
   * You can label PRs using `build-all`, `build-dependents`, `skip-tests` and `test-dependents` to fine-tune the checks executed by this PR.
   
   * Build and test logs are available in the Summary page. **Only** [Apache Camel committers](https://camel.apache.org/community/team/#committers) have access to the summary. 
   
   * :warning: Be careful when sharing logs. Review their contents before sharing them publicly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-19241: implement support for auto-commits with batching in camel-kafka [camel]

Posted by "orpiske (via GitHub)" <gi...@apache.org>.
orpiske commented on code in PR #12879:
URL: https://github.com/apache/camel/pull/12879#discussion_r1464807362


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
     private final Processor processor;
     private final CommitManager commitManager;
 
+    private record CommitSynchronization(CommitManager commitManager) implements Synchronization {
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            final List<?> exchanges = exchange.getMessage().getBody(List.class);
+
+            // Ensure we are actually receiving what we are asked for
+            if (exchanges == null || exchanges.isEmpty()) {
+                LOG.warn("The exchange is {}", exchanges == null ? "not of the expected type (null)" : "empty");
+                return;
+            }
+
+            LOG.debug("Calling commit on {} exchanges using {}", exchanges.size(), commitManager.getClass().getSimpleName());
+            commitManager.commit();
+        }
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed");

Review Comment:
   > The other components would use consumer.getExceptionHandler().handleException(...) which are usually if there was a caused unhandled exception, and the default implementation will just log a WARN.
   
   Understood. I think I got your point. 
   
   > Is kafka not a lot faster, or how "slow is that rate" ?
   
   In theory the rate (of the message) should be something similar to `(producer rate / batch size) - consumption delay`. IOW, the larger the batch size, the lower the incidence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-19241: implement support for auto-commits with batching in camel-kafka [camel]

Posted by "orpiske (via GitHub)" <gi...@apache.org>.
orpiske commented on code in PR #12879:
URL: https://github.com/apache/camel/pull/12879#discussion_r1463974918


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
     private final Processor processor;
     private final CommitManager commitManager;
 
+    private record CommitSynchronization(CommitManager commitManager) implements Synchronization {
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            final List<?> exchanges = exchange.getMessage().getBody(List.class);
+
+            // Ensure we are actually receiving what we are asked for
+            if (exchanges == null || exchanges.isEmpty()) {
+                LOG.warn("The exchange is {}", exchanges == null ? "not of the expected type (null)" : "empty");
+                return;
+            }
+
+            LOG.debug("Calling commit on {} exchanges using {}", exchanges.size(), commitManager.getClass().getSimpleName());
+            commitManager.commit();
+        }
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed");

Review Comment:
   Indeed! 
   
   However, it will only happen if the user is not handling the error correctly (as described on the documentation). 
   
   We could add a flag, so that it logs only once ... but I also feel that it may be hard for the users to notice the problem. Maybe we could do that + also increase the log level to error, so that the users can find with more easily with `grep` or their observability tools. 
   
   Do you have any suggestions how we could this more elegantly? (Or, if we have solved this problem in some other component, just let me know and I'll do the same).
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] CAMEL-19241: implement support for auto-commits with batching in camel-kafka [camel]

Posted by "davsclaus (via GitHub)" <gi...@apache.org>.
davsclaus commented on code in PR #12879:
URL: https://github.com/apache/camel/pull/12879#discussion_r1464521542


##########
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java:
##########
@@ -43,6 +44,28 @@ final class KafkaRecordBatchingProcessor extends KafkaRecordProcessor {
     private final Processor processor;
     private final CommitManager commitManager;
 
+    private record CommitSynchronization(CommitManager commitManager) implements Synchronization {
+
+        @Override
+        public void onComplete(Exchange exchange) {
+            final List<?> exchanges = exchange.getMessage().getBody(List.class);
+
+            // Ensure we are actually receiving what we are asked for
+            if (exchanges == null || exchanges.isEmpty()) {
+                LOG.warn("The exchange is {}", exchanges == null ? "not of the expected type (null)" : "empty");
+                return;
+            }
+
+            LOG.debug("Calling commit on {} exchanges using {}", exchanges.size(), commitManager.getClass().getSimpleName());
+            commitManager.commit();
+        }
+
+        @Override
+        public void onFailure(Exchange exchange) {
+            LOG.warn("Skipping auto-commit on the batch because processing the exchanged has failed");

Review Comment:
   The other components would use consumer.getExceptionHandler().handleException(...) which are usually if there was a caused unhandled exception, and the default implementation will just log a WARN. 
   
   So if you do
   
   from file
      throw exception
   
   it will also just keep being noisy but its only doing this every 0.5 sec (by default). 
   
   Is kafka not a lot faster, or how "slow is that rate" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org