You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/16 08:03:27 UTC

[camel] branch main updated (fed7c7e -> 40abf31)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from fed7c7e  Sync deps
     new 36b2bda  CAMEL-17762: allow defining a custom poll duration on the Kafka resume strategy
     new 4d0d4b1  CAMEL-17762: fixed inverted documentation for the resumable test
     new ec9a2a1  CAMEL-17762: simplify resumable presence check
     new 40abf31  CAMEL-17762: adjust overly verbose debug message

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/camel/component/file/FileConsumer.java  |  2 +-
 .../apache/camel/component/file/consumer/FileResumeSet.java |  6 ++----
 .../processor/resume/kafka/AbstractKafkaResumeStrategy.java | 13 +++++++++++--
 .../src/main/java/org/apache/camel/ResumableSet.java        |  6 +++---
 .../apache/camel/processor/resume/ResumableCompletion.java  |  6 +++---
 5 files changed, 20 insertions(+), 13 deletions(-)

[camel] 03/04: CAMEL-17762: simplify resumable presence check

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ec9a2a1532f9151ae86074af5d0cd97b47b8afdc
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Mar 15 13:01:22 2022 +0100

    CAMEL-17762: simplify resumable presence check
---
 .../src/main/java/org/apache/camel/component/file/FileConsumer.java | 2 +-
 .../org/apache/camel/component/file/consumer/FileResumeSet.java     | 6 ++----
 core/camel-api/src/main/java/org/apache/camel/ResumableSet.java     | 2 +-
 3 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 1f5ca77..bd79aadb 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -175,7 +175,7 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
             FileResumeSet resumeSet = new FileResumeSet(dirFiles);
             resumeStrategy.resume(resumeSet);
 
-            return resumeSet.hasResumables() ? resumeSet.resumed() : dirFiles;
+            return resumeSet.resumed();
         }
 
         return dirFiles;
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
index 0493186..f746c0f 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
@@ -59,10 +59,8 @@ public final class FileResumeSet implements ResumableSet<File> {
      * @return true if there are resumable files or false otherwise
      */
     public boolean hasResumables() {
-        if (outputFiles != null && outputFiles.length > 0) {
-            if (outputFiles.length != inputFiles.length) {
-                return true;
-            }
+        if (outputFiles != inputFiles) {
+            return true;
         }
 
         return false;
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java b/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
index 1843063..0a18ba9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
@@ -49,7 +49,7 @@ public interface ResumableSet<T> {
             }
         }
 
-        if (count > 0 && count != input.length) {
+        if (count != input.length) {
             return Arrays.copyOf(tmp, count);
         }
 

[camel] 02/04: CAMEL-17762: fixed inverted documentation for the resumable test

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4d0d4b1f93db91b80a5462a61f2aca05861fc88b
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Mar 15 12:59:27 2022 +0100

    CAMEL-17762: fixed inverted documentation for the resumable test
---
 core/camel-api/src/main/java/org/apache/camel/ResumableSet.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java b/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
index 898c997..1843063 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ResumableSet.java
@@ -34,7 +34,7 @@ public interface ResumableSet<T> {
      * @param  input          the input array to check for resumables
      * @param  resumableCheck a checker method that returns true if a single entry of the input should be resumed or
      *                        false otherwise. For instance: given a set A, B and C, where B has already been processed,
-     *                        then a test for A and C returns false, whereas a test for B returns true.
+     *                        then a test for A and C returns true, whereas a test for B returns false.
      * @return                a new array containing the elements that still need to be processed
      */
     default T[] resumeEach(T[] input, Predicate<T> resumableCheck) {
@@ -61,7 +61,7 @@ public interface ResumableSet<T> {
      *
      * @param resumableCheck a checker method that returns true if a single entry of the input should be resumed or
      *                       false otherwise. For instance: given a set A, B and C, where B has already been processed,
-     *                       then a test for A and C returns false, whereas a test for B returns true.
+     *                       then a test for A and C returns true, whereas a test for B returns false.
      */
     void resumeEach(Predicate<T> resumableCheck);
 

[camel] 04/04: CAMEL-17762: adjust overly verbose debug message

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 40abf3166c7472e1628b03370503655a821fb571
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Mar 15 15:22:49 2022 +0100

    CAMEL-17762: adjust overly verbose debug message
---
 .../java/org/apache/camel/processor/resume/ResumableCompletion.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index 5b213cb..f80f71e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -46,9 +46,9 @@ public class ResumableCompletion implements Synchronization {
         if (offset instanceof Resumable) {
             Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Processing the resumable: {}", resumable.getAddressable());
-                LOG.debug("Processing the resumable of type: {}", resumable.getLastOffset().offset());
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing the resumable: {}", resumable.getAddressable());
+                LOG.trace("Processing the resumable of type: {}", resumable.getLastOffset().offset());
             }
 
             if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) {

[camel] 01/04: CAMEL-17762: allow defining a custom poll duration on the Kafka resume strategy

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 36b2bda3c17c3eb69abc1f7e1ac8f249f07ff000
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Mar 15 12:58:47 2022 +0100

    CAMEL-17762: allow defining a custom poll duration on the Kafka resume strategy
---
 .../processor/resume/kafka/AbstractKafkaResumeStrategy.java | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
index 079d936..80c2cbf 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
@@ -60,6 +61,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     private Consumer<K, V> consumer;
     private Producer<K, V> producer;
     private long errorCount;
+    private Duration pollDuration = Duration.ofSeconds(1);
 
     private final List<Future<RecordMetadata>> sentItems = new ArrayList<>();
     private final ResumeCache<K, V> resumeCache;
@@ -202,7 +204,6 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         }
     }
 
-    // TODO: bad method ...
     /**
      * @param topic the topic to consume the messages from
      */
@@ -260,7 +261,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
 
     public ConsumerRecords<K, V> consume(int retries) {
         while (retries > 0) {
-            ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(100));
+            ConsumerRecords<K, V> records = consumer.poll(pollDuration);
             if (!records.isEmpty()) {
                 return records;
             }
@@ -317,4 +318,12 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
             LOG.error("Failed to load already processed items: {}", e.getMessage(), e);
         }
     }
+
+    public Duration getPollDuration() {
+        return pollDuration;
+    }
+
+    public void setPollDuration(Duration pollDuration) {
+        this.pollDuration = Objects.requireNonNull(pollDuration, "The poll duration cannot be null");
+    }
 }