You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/11/21 12:42:58 UTC

[camel] branch master updated: CAMEL-14199 : camel-hdfs - Add maxMessagesPerPoll for Consumer (#3356)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 227c2b4  CAMEL-14199 : camel-hdfs - Add maxMessagesPerPoll for Consumer (#3356)
227c2b4 is described below

commit 227c2b462a6d424ad5d17c075558bcb60d1451b2
Author: Marius Cornescu <ma...@yahoo.com>
AuthorDate: Thu Nov 21 13:42:44 2019 +0100

    CAMEL-14199 : camel-hdfs - Add maxMessagesPerPoll for Consumer (#3356)
    
    CAMEL-14199 - camel-hdfs - Add maxMessagesPerPoll for Consumer
---
 .../camel-hdfs/src/main/docs/hdfs-component.adoc   |  3 +-
 .../camel/component/hdfs/HdfsConfiguration.java    | 17 ++++++++++
 .../apache/camel/component/hdfs/HdfsConstants.java |  2 ++
 .../apache/camel/component/hdfs/HdfsConsumer.java  |  1 +
 .../endpoint/dsl/HdfsEndpointBuilderFactory.java   | 36 ++++++++++++++++++++++
 5 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/components/camel-hdfs/src/main/docs/hdfs-component.adoc b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
index 013d53e..f118b2c 100644
--- a/components/camel-hdfs/src/main/docs/hdfs-component.adoc
+++ b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
@@ -99,7 +99,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (46 parameters):
+=== Query Parameters (47 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -134,6 +134,7 @@ with the following path and query parameters:
 | *replication* (advanced) | The HDFS replication factor | 3 | short
 | *splitStrategy* (advanced) | In the current version of Hadoop opening a file in append mode is disabled since it's not very reliable. So, for the moment, it's only possible to create new files. The Camel HDFS endpoint tries to solve this problem in this way: If the split strategy option has been defined, the hdfs path will be used as a directory and files will be created using the configured UuidGenerator. Every time a splitting condition is met, a new file is created. The splitStrateg [...]
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *maxMessagesPerPoll* (filter) | To define a maximum messages to gather per poll. By default a limit of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when starting up the server that there are thousands of files. Values can only be greater than 0. Notice: If this option is in use then the limit will be applied on the valid files. For example if you have 100000 files and use maxMessagesPerPoll=500, then only the first 500 files will be picked up. | 100 | int
 | *backoffErrorThreshold* (scheduler) | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in. |  | int
 | *backoffIdleThreshold* (scheduler) | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in. |  | int
 | *backoffMultiplier* (scheduler) | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured. |  | int
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
index 1e499ed..db97d66 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
@@ -83,6 +83,8 @@ public class HdfsConfiguration {
     private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL;
     @UriParam(defaultValue = "true")
     private boolean connectOnStartup = true;
+    @UriParam(label = "consumer,filter", defaultValue = "" + HdfsConstants.DEFAULT_MAX_MESSAGES_PER_POLL)
+    private int maxMessagesPerPoll = HdfsConstants.DEFAULT_MAX_MESSAGES_PER_POLL;
     @UriParam
     private String owner;
 
@@ -538,6 +540,21 @@ public class HdfsConfiguration {
         this.connectOnStartup = connectOnStartup;
     }
 
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    /**
+     * To define a maximum messages to gather per poll.
+     * By default a limit of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when starting up the server that there are thousands of files.
+     * Values can only be greater than 0.
+     * Notice: If this option is in use then the limit will be applied on the valid files.
+     * For example if you have 100000 files and use maxMessagesPerPoll=500, then only the first 500 files will be picked up.
+     */
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
     public String getOwner() {
         return owner;
     }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java
index ee89a42..3572ffc 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConstants.java
@@ -46,6 +46,8 @@ public final class HdfsConstants {
 
     public static final String HDFS_CLOSE = "CamelHdfsClose";
 
+    public static final int DEFAULT_MAX_MESSAGES_PER_POLL = 100;
+
     private HdfsConstants() {
     }
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 4998d7a..affad6c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -132,6 +132,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         Arrays.stream(fileStatuses)
                 .filter(status -> normalFileIsDirectoryHasSuccessFile(status, info))
                 .filter(this::hasMatchingOwner)
+                .limit(endpointConfig.getMaxMessagesPerPoll())
                 .map(this::createInputStream)
                 .filter(Objects::nonNull)
                 .forEach(hdfsInputStream -> {
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java
index 80100ac..9e7242b 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java
@@ -306,6 +306,42 @@ public interface HdfsEndpointBuilderFactory {
             return this;
         }
         /**
+         * To define a maximum messages to gather per poll. By default a limit
+         * of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when
+         * starting up the server that there are thousands of files. Values can
+         * only be greater than 0. Notice: If this option is in use then the
+         * limit will be applied on the valid files. For example if you have
+         * 100000 files and use maxMessagesPerPoll=500, then only the first 500
+         * files will be picked up.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Group: filter
+         */
+        default HdfsEndpointConsumerBuilder maxMessagesPerPoll(
+                int maxMessagesPerPoll) {
+            doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll);
+            return this;
+        }
+        /**
+         * To define a maximum messages to gather per poll. By default a limit
+         * of 100 is set. Can be used to set a limit of e.g. 1000 to avoid when
+         * starting up the server that there are thousands of files. Values can
+         * only be greater than 0. Notice: If this option is in use then the
+         * limit will be applied on the valid files. For example if you have
+         * 100000 files and use maxMessagesPerPoll=500, then only the first 500
+         * files will be picked up.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Group: filter
+         */
+        default HdfsEndpointConsumerBuilder maxMessagesPerPoll(
+                String maxMessagesPerPoll) {
+            doSetProperty("maxMessagesPerPoll", maxMessagesPerPoll);
+            return this;
+        }
+        /**
          * The number of subsequent error polls (failed due some error) that
          * should happen before the backoffMultipler should kick-in.
          *