You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/12/07 11:13:47 UTC
(camel) branch main updated: Scheduled consumer should be able to mark as ready sooner than after first poll (default) (#12346)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ee17670c284 Scheduled consumer should be able to mark as ready sooner than after first poll (default) (#12346)
ee17670c284 is described below
commit ee17670c284fd7d7ba163481148fd3cc80ff9092
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Dec 7 12:13:39 2023 +0100
Scheduled consumer should be able to mark as ready sooner than after first poll (default) (#12346)
* CAMEL-20189: camel-ftp: Force marking the consumer ready sooner, in case downloading a big file takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-minio: Force marking the consumer ready sooner, in case downloading a big file takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-mail: Force marking the consumer ready sooner, in case downloading mail with big files takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-azure: Force marking the consumer ready sooner, in case downloading big files takes too long, causing readiness check to timeout and fail.
* Regen
* CAMEL-20189: camel-ironmq: Force marking the consumer ready sooner, in case downloading big files takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-aws: Force marking the consumer ready sooner, in case downloading big files takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-jpa: Force marking the consumer ready sooner, in case processing exchanges takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-jsql and camel-mybatis: Force marking the consumer ready sooner, in case processing exchanges takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-jooq: Force marking the consumer ready sooner, in case processing exchanges takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-slack/splunk: Force marking the consumer ready sooner, in case processing exchanges takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-huawei: Force marking the consumer ready sooner, in case processing exchanges takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-google: Force marking the consumer ready sooner, in case processing exchanges takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-core: Force API based consumer marking the consumer ready sooner, in case processing exchanges takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-couchbase: Force marking the consumer ready sooner, in case processing exchanges takes too long, causing readiness check to timeout and fail.
* CAMEL-20189: camel-core: Add better API on scheduled poll consumer to force mark the consumer ready.
* CAMEL-20189: migration guide.
* Regen docs
---
.../aws/cloudtrail/CloudtrailConsumer.java | 3 +++
.../aws2/ddbstream/Ddb2StreamConsumer.java | 5 ++++
.../component/aws2/kinesis/Kinesis2Consumer.java | 3 +++
.../aws2/redshift/data/RedshiftData2Producer.java | 2 --
.../camel/component/aws2/s3/AWS2S3Consumer.java | 4 +++
.../camel/component/aws2/sqs/Sqs2Consumer.java | 3 +++
components/camel-azure/camel-azure-files/pom.xml | 2 +-
.../services/org/apache/camel/component.properties | 2 +-
.../camel/component/file/azure/FilesConsumer.java | 3 +++
.../component/azure/storage/blob/BlobConsumer.java | 4 ++-
.../camel-azure-storage-datalake/pom.xml | 4 +--
.../services/org/apache/camel/component.properties | 4 +--
.../azure/storage/datalake/DataLakeConsumer.java | 6 +++--
.../azure/storage/queue/QueueConsumer.java | 4 ++-
.../component/couchbase/CouchbaseConsumer.java | 3 +++
.../component/file/remote/RemoteFileConsumer.java | 3 +++
.../mail/stream/GoogleMailStreamConsumer.java | 4 +++
.../sheets/stream/GoogleSheetsStreamConsumer.java | 3 +++
.../google/storage/GoogleCloudStorageConsumer.java | 10 ++++---
.../component/huaweicloud/obs/OBSConsumer.java | 15 +++--------
.../camel/component/ironmq/IronMQConsumer.java | 3 +++
.../apache/camel/component/jooq/JooqConsumer.java | 5 +++-
.../apache/camel/component/jpa/JpaConsumer.java | 3 +++
.../apache/camel/component/mail/MailConsumer.java | 3 +++
.../camel/component/minio/MinioConsumer.java | 4 ++-
.../camel/component/mybatis/MyBatisConsumer.java | 3 +++
.../camel/component/slack/SlackConsumer.java | 3 +++
.../camel/component/splunk/SplunkConsumer.java | 4 +++
.../apache/camel/component/sql/SqlConsumer.java | 3 +++
.../mbean/ManagedSchedulePollConsumerMBean.java | 3 +++
.../mbean/ManagedScheduledPollConsumer.java | 5 ++++
.../camel/support/ScheduledPollConsumer.java | 30 ++++++++++++++++++++-
.../support/ScheduledPollConsumerHealthCheck.java | 6 ++---
.../support/component/AbstractApiConsumer.java | 6 +++--
.../ROOT/examples/json/kubernetes-cronjob.json | 1 +
docs/components/modules/ROOT/nav.adoc | 1 +
.../ROOT/pages/kubernetes-cronjob-component.adoc | 1 +
.../ROOT/pages/camel-4x-upgrade-guide-4_3.adoc | 31 ++++++++++++++++++++++
.../HashicorpVaultLocalContainerService.java | 16 +++++++++++
39 files changed, 184 insertions(+), 34 deletions(-)
diff --git a/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumer.java b/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumer.java
index 1a54b29cee9..1740d05c0bf 100644
--- a/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumer.java
+++ b/components/camel-aws/camel-aws-cloudtrail/src/main/java/org/apache/camel/component/aws/cloudtrail/CloudtrailConsumer.java
@@ -63,6 +63,9 @@ public class CloudtrailConsumer extends ScheduledBatchPollingConsumer {
LookupEventsResponse response = getClient().lookupEvents(eventsRequestBuilder.build());
+ // okay we have some response from aws so lets mark the consumer as ready
+ forceConsumerAsReady();
+
if (!response.events().isEmpty()) {
lastTime = response.events().get(0).eventTime();
}
diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
index 62491771907..9e3aa959d0a 100644
--- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
+++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
@@ -55,7 +55,12 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer {
@Override
protected int poll() throws Exception {
int processedExchangeCount = 0;
+
Map<String, String> shardIterators = shardIteratorHandler.getShardIterators();
+
+ // okay we have some response from azure so lets mark the consumer as ready
+ forceConsumerAsReady();
+
for (Entry<String, String> shardIteratorEntry : shardIterators.entrySet()) {
int limitPerRecordsRequest = Math.max(1,
getEndpoint().getConfiguration().getMaxResultsPerRequest() / shardIterators.size());
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 87178332e51..0e750f8268c 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -113,6 +113,9 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
});
}
+ // okay we have some response from aws so lets mark the consumer as ready
+ forceConsumerAsReady();
+
return processedExchangeCount.get();
}
diff --git a/components/camel-aws/camel-aws2-redshift/src/main/java/org/apache/camel/component/aws2/redshift/data/RedshiftData2Producer.java b/components/camel-aws/camel-aws2-redshift/src/main/java/org/apache/camel/component/aws2/redshift/data/RedshiftData2Producer.java
index 477a3c77f20..5f7e1c18052 100644
--- a/components/camel-aws/camel-aws2-redshift/src/main/java/org/apache/camel/component/aws2/redshift/data/RedshiftData2Producer.java
+++ b/components/camel-aws/camel-aws2-redshift/src/main/java/org/apache/camel/component/aws2/redshift/data/RedshiftData2Producer.java
@@ -22,8 +22,6 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.Message;
-import org.apache.camel.health.HealthCheck;
-import org.apache.camel.health.WritableHealthCheckRepository;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
diff --git a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
index 5cd358cf8b8..ff8234de3a1 100644
--- a/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
+++ b/components/camel-aws/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
@@ -160,6 +160,10 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer {
exchanges = createExchanges(listObjects.contents());
}
+
+ // okay we have some response from azure so lets mark the consumer as ready
+ forceConsumerAsReady();
+
return processBatch(CastUtils.cast(exchanges));
}
diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 1981c441b23..b6f31b371f5 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -122,6 +122,9 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
LOG.trace("Received {} messages", messageResult.messages().size());
}
+ // okay we have some response from aws so lets mark the consumer as ready
+ forceConsumerAsReady();
+
Queue<Exchange> exchanges = createExchanges(messageResult.messages());
return processBatch(CastUtils.cast(exchanges));
}
diff --git a/components/camel-azure/camel-azure-files/pom.xml b/components/camel-azure/camel-azure-files/pom.xml
index fade9b5d4d8..83fa0e15b8d 100644
--- a/components/camel-azure/camel-azure-files/pom.xml
+++ b/components/camel-azure/camel-azure-files/pom.xml
@@ -28,7 +28,7 @@
<artifactId>camel-azure-files</artifactId>
<packaging>jar</packaging>
- <name>Camel :: Azure Files</name>
+ <name>Camel :: Azure :: Files</name>
<description>Camel Azure Files Component</description>
<properties>
diff --git a/components/camel-azure/camel-azure-files/src/generated/resources/META-INF/services/org/apache/camel/component.properties b/components/camel-azure/camel-azure-files/src/generated/resources/META-INF/services/org/apache/camel/component.properties
index 86c53ac1506..6b103ed8de2 100644
--- a/components/camel-azure/camel-azure-files/src/generated/resources/META-INF/services/org/apache/camel/component.properties
+++ b/components/camel-azure/camel-azure-files/src/generated/resources/META-INF/services/org/apache/camel/component.properties
@@ -3,5 +3,5 @@ components=azure-files
groupId=org.apache.camel
artifactId=camel-azure-files
version=4.3.0-SNAPSHOT
-projectName=Camel :: Azure Files
+projectName=Camel :: Azure :: Files
projectDescription=Camel Azure Files Component
diff --git a/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/FilesConsumer.java b/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/FilesConsumer.java
index 53214c9afbd..d40d1305f3f 100644
--- a/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/FilesConsumer.java
+++ b/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/FilesConsumer.java
@@ -102,6 +102,9 @@ public class FilesConsumer extends RemoteFileConsumer<ShareFileItem> {
var listedFileItems = listFileItems(path);
+ // okay we have some response from azure so lets mark the consumer as ready
+ forceConsumerAsReady();
+
if (listedFileItems == null || listedFileItems.length == 0) {
LOG.trace("No files found in directory: {}", path);
return true;
diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
index be69d8b7f1e..b52c9e0855f 100644
--- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
+++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConsumer.java
@@ -108,8 +108,10 @@ public class BlobConsumer extends ScheduledBatchPollingConsumer {
final List<BlobItem> blobs = (List<BlobItem>) containerOperations.listBlobs(null).getBody();
- final Queue<Exchange> exchanges = new LinkedList<>();
+ // okay we have some response from azure so lets mark the consumer as ready
+ forceConsumerAsReady();
+ final Queue<Exchange> exchanges = new LinkedList<>();
for (BlobItem blobItem : blobs) {
exchanges.add(createExchangeFromBlob(blobItem.getName(), blobContainerClient));
}
diff --git a/components/camel-azure/camel-azure-storage-datalake/pom.xml b/components/camel-azure/camel-azure-storage-datalake/pom.xml
index 1790dee2519..05741b903a0 100644
--- a/components/camel-azure/camel-azure-storage-datalake/pom.xml
+++ b/components/camel-azure/camel-azure-storage-datalake/pom.xml
@@ -29,8 +29,8 @@
<artifactId>camel-azure-storage-datalake</artifactId>
<packaging>jar</packaging>
- <name>Camel :: Azure :: Datalake Gen2</name>
- <description>Camel Azure Datalake Gen2 Component</description>
+ <name>Camel :: Azure :: Storage Datalake</name>
+ <description>Camel Azure Datalake Component</description>
<dependencies>
<dependency>
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/generated/resources/META-INF/services/org/apache/camel/component.properties b/components/camel-azure/camel-azure-storage-datalake/src/generated/resources/META-INF/services/org/apache/camel/component.properties
index e080e463473..653098aba29 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/generated/resources/META-INF/services/org/apache/camel/component.properties
+++ b/components/camel-azure/camel-azure-storage-datalake/src/generated/resources/META-INF/services/org/apache/camel/component.properties
@@ -3,5 +3,5 @@ components=azure-storage-datalake
groupId=org.apache.camel
artifactId=camel-azure-storage-datalake
version=4.3.0-SNAPSHOT
-projectName=Camel :: Azure :: Datalake Gen2
-projectDescription=Camel Azure Datalake Gen2 Component
+projectName=Camel :: Azure :: Storage Datalake
+projectDescription=Camel Azure Datalake Component
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
index f288230e078..ee9c739846b 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConsumer.java
@@ -87,15 +87,17 @@ public class DataLakeConsumer extends ScheduledBatchPollingConsumer {
= new DataLakeFileSystemOperations(getEndpoint().getConfiguration(), fileSystemClientWrapper);
final List<PathItem> items = (List<PathItem>) fileSystemOperations.listPaths(null).getBody();
- final Queue<Exchange> exchanges = new LinkedList<>();
+ // okay we have some response from azure so lets mark the consumer as ready
+ forceConsumerAsReady();
+
+ final Queue<Exchange> exchanges = new LinkedList<>();
for (PathItem pathItem : items) {
if (!pathItem.isDirectory()) {
exchanges.add(createExchangeFromFile(pathItem.getName(), dataLakeFileSystemClient));
}
}
return exchanges;
-
}
private Exchange createExchangeFromFile(final String fileName, final DataLakeFileSystemClient dataLakeFileSystemClient)
diff --git a/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java b/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
index 9758e823768..46980783b2f 100644
--- a/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
+++ b/components/camel-azure/camel-azure-storage-queue/src/main/java/org/apache/camel/component/azure/storage/queue/QueueConsumer.java
@@ -49,7 +49,6 @@ public class QueueConsumer extends ScheduledBatchPollingConsumer {
public QueueConsumer(final QueueEndpoint endpoint, final Processor processor) {
super(endpoint, processor);
-
}
@Override
@@ -71,6 +70,9 @@ public class QueueConsumer extends ScheduledBatchPollingConsumer {
getConfiguration().getVisibilityTimeout(),
getConfiguration().getTimeout());
+ // okay we have some response from azure so lets mark the consumer as ready
+ forceConsumerAsReady();
+
LOG.trace("Receiving messages [{}]...", messageItems);
final Queue<Exchange> exchanges = createExchanges(messageItems);
diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index af29bd51500..8b252244f4c 100644
--- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -112,6 +112,9 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer implements R
protected synchronized int poll() throws Exception {
ViewResult result = bucket.viewQuery(endpoint.getDesignDocumentName(), endpoint.getViewName(), this.viewOptions);
+ // okay we have some response from CouchBase so lets mark the consumer as ready
+ forceConsumerAsReady();
+
if (LOG.isTraceEnabled()) {
LOG.trace("ViewResponse = {}", result);
}
diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index 82ff2fff563..bc50429d546 100644
--- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -94,6 +94,9 @@ public abstract class RemoteFileConsumer<T> extends GenericFileConsumer<T> {
loggedInWarning = false;
}
+ // we are logged in so lets mark the consumer as ready
+ forceConsumerAsReady();
+
return true;
}
diff --git a/components/camel-google/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java b/components/camel-google/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
index 285c8f37ca1..aba62302a23 100644
--- a/components/camel-google/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
+++ b/components/camel-google/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
@@ -86,6 +86,10 @@ public class GoogleMailStreamConsumer extends ScheduledBatchPollingConsumer {
Queue<Exchange> answer = new LinkedList<>();
ListMessagesResponse c = request.execute();
+
+ // okay we have some response from Google so lets mark the consumer as ready
+ forceConsumerAsReady();
+
if (c.getMessages() != null) {
for (Message message : c.getMessages()) {
Message mess = getClient().users().messages().get("me", message.getId()).setFormat("FULL").execute();
diff --git a/components/camel-google/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java b/components/camel-google/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
index e6540671fa9..f39cb20a4b3 100644
--- a/components/camel-google/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
+++ b/components/camel-google/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
@@ -80,6 +80,9 @@ public class GoogleSheetsStreamConsumer extends ScheduledBatchPollingConsumer {
BatchGetValuesResponse response = request.execute();
+ // okay we have some response from Google so lets mark the consumer as ready
+ forceConsumerAsReady();
+
if (response.getValueRanges() != null) {
if (getConfiguration().isSplitResults()) {
for (ValueRange valueRange : response.getValueRanges()) {
diff --git a/components/camel-google/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java b/components/camel-google/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
index ca308e15056..7fa029ad7cc 100644
--- a/components/camel-google/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
+++ b/components/camel-google/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Bucket;
@@ -97,9 +98,13 @@ public class GoogleCloudStorageConsumer extends ScheduledBatchPollingConsumer {
} else {
LOG.trace("Queueing objects in bucket [{}]...", bucketName);
- List<Blob> bloblist = new LinkedList<>();
- for (Blob blob : getStorageClient().list(bucketName).iterateAll()) {
+ Page<Blob> page = getStorageClient().list(bucketName);
+
+ // okay we have some response from Google so lets mark the consumer as ready
+ forceConsumerAsReady();
+ List<Blob> bloblist = new LinkedList<>();
+ for (Blob blob : page.iterateAll()) {
if (filter != null && !filter.isEmpty()) {
if (blob.getBlobId().getName().matches(filter)) {
bloblist.add(blob);
@@ -107,7 +112,6 @@ public class GoogleCloudStorageConsumer extends ScheduledBatchPollingConsumer {
} else {
bloblist.add(blob);
}
-
}
if (LOG.isTraceEnabled()) {
diff --git a/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSConsumer.java b/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSConsumer.java
index 89412cdb101..79d2b361b34 100644
--- a/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSConsumer.java
+++ b/components/camel-huawei/camel-huaweicloud-obs/src/main/java/org/apache/camel/component/huaweicloud/obs/OBSConsumer.java
@@ -139,6 +139,9 @@ public class OBSConsumer extends ScheduledBatchPollingConsumer {
ObjectListing objectListing = obsClient.listObjects(request);
+ // okay we have some response from huawei so lets mark the consumer as ready
+ forceConsumerAsReady();
+
// if the list is truncated, set marker for next poll. Otherwise, set marker to null
if (objectListing.isTruncated()) {
marker = objectListing.getNextMarker();
@@ -187,8 +190,6 @@ public class OBSConsumer extends ScheduledBatchPollingConsumer {
/**
* Create exchanges for each OBS object in obsObjects
- *
- * @param obsObjects
*/
private Queue<Exchange> createExchanges(List<ObsObject> obsObjects) {
Queue<Exchange> answer = new LinkedList<>();
@@ -196,7 +197,7 @@ public class OBSConsumer extends ScheduledBatchPollingConsumer {
ObsObject obsObject;
if (objectSummary.getMetadata().getContentType() == null) {
- // object was the from list objects. Since not all object data is included when listing objects, we must retrieve all the data by calling getObject
+ // object was from list objects. Since not all object data is included when listing objects, we must retrieve all the data by calling getObject
obsObject = obsClient.getObject(endpoint.getBucketName(), objectSummary.getObjectKey());
} else {
// object was already retrieved using getObjects
@@ -214,8 +215,6 @@ public class OBSConsumer extends ScheduledBatchPollingConsumer {
/**
* Determine of obsObject should be included as an exchange based on the includeFolders user option
- *
- * @param obsObject
*/
private boolean includeObsObject(ObsObject obsObject) {
return endpoint.isIncludeFolders() || !obsObject.getObjectKey().endsWith("/");
@@ -223,8 +222,6 @@ public class OBSConsumer extends ScheduledBatchPollingConsumer {
/**
* Create a new exchange from obsObject
- *
- * @param obsObject
*/
public Exchange createExchange(ObsObject obsObject) {
Exchange exchange = createExchange(true);
@@ -237,8 +234,6 @@ public class OBSConsumer extends ScheduledBatchPollingConsumer {
/**
* To handle the exchange after it has been processed
- *
- * @param exchange
*/
private void processComplete(Exchange exchange) {
String bucketName = exchange.getIn().getHeader(OBSHeaders.BUCKET_NAME, String.class);
@@ -257,8 +252,6 @@ public class OBSConsumer extends ScheduledBatchPollingConsumer {
/**
* To handle when the exchange failed
- *
- * @param exchange
*/
private void processFailure(Exchange exchange) {
Exception exception = exchange.getException();
diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
index 6bad0e60fe5..a91fbb99ee7 100644
--- a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
+++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
@@ -79,6 +79,9 @@ public class IronMQConsumer extends ScheduledBatchPollingConsumer {
getEndpoint().getConfiguration().getWait());
LOG.trace("Received {} messages", messages.getSize());
+ // okay we have some response from ironmq so lets mark the consumer as ready
+ forceConsumerAsReady();
+
Queue<Exchange> exchanges = createExchanges(messages.getMessages());
int noProcessed = processBatch(CastUtils.cast(exchanges));
// delete all processed messages in one batch;
diff --git a/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java b/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
index 82e7a59f376..d1f8e8cb429 100644
--- a/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
+++ b/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
@@ -60,7 +60,10 @@ public class JooqConsumer extends ScheduledBatchPollingConsumer {
Queue<DataHolder> answer = new LinkedList<>();
Result<UpdatableRecord<?>> results = context.selectFrom(getTable(entityType)).fetch();
- for (UpdatableRecord result : results) {
+ // okay we have some response from jooq so lets mark the consumer as ready
+ forceConsumerAsReady();
+
+ for (UpdatableRecord<?> result : results) {
DataHolder holder = new DataHolder();
holder.exchange = createExchange(result);
answer.add(holder);
diff --git a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index f4d9b451d64..620a44dbde7 100644
--- a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++ b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -119,6 +119,9 @@ public class JpaConsumer extends ScheduledBatchPollingConsumer {
List<?> results = toExecute.getResultList();
LOG.trace("Got result list from query {}", results);
+ // okay we have some response from jpa so lets mark the consumer as ready
+ forceConsumerAsReady();
+
for (Object result : results) {
DataHolder holder = new DataHolder();
holder.manager = entityManager;
diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
index 15037708d41..4a818e112d9 100644
--- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
@@ -158,6 +158,9 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
return 0; // return since we cannot poll mail messages, but will re-connect on next poll.
}
+ // okay consumer is connected to the mail server
+ forceConsumerAsReady();
+
try {
int count = folder.getMessageCount();
if (count > 0) {
diff --git a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
index 49647d77814..deaf5e4eda8 100644
--- a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
+++ b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
@@ -166,6 +166,9 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
Iterator<Result<Item>> listObjects = getMinioClient().listObjects(listObjectRequest.build()).iterator();
+ // we have listed some objects so mark the consumer as ready
+ forceConsumerAsReady();
+
if (listObjects.hasNext()) {
exchanges = createExchanges(listObjects);
if (maxMessagesPerPoll <= 0 || exchanges.size() < maxMessagesPerPoll) {
@@ -224,7 +227,6 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
} catch (Exception e) {
LOG.warn("Error getting MinioObject due: {}", e.getMessage());
throw e;
-
}
return answer;
diff --git a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
index 237c175262d..6bf0e4e35a3 100644
--- a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
+++ b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
@@ -78,6 +78,9 @@ public class MyBatisConsumer extends ScheduledBatchPollingConsumer {
LOG.trace("Polling: {}", endpoint);
List<?> data = endpoint.getProcessingStrategy().poll(this, getEndpoint());
+ // okay we have some response from MyBatis so lets mark the consumer as ready
+ forceConsumerAsReady();
+
// create a list of exchange objects with the data
Queue<DataHolder> answer = new LinkedList<>();
if (useIterator) {
diff --git a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
index fdac796fda8..d050060ebb3 100644
--- a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
+++ b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
@@ -85,6 +85,9 @@ public class SlackConsumer extends ScheduledBatchPollingConsumer {
throw new RuntimeCamelException("API request conversations.history to Slack failed: " + response);
}
+ // okay we have some response from slack so lets mark the consumer as ready
+ forceConsumerAsReady();
+
Queue<Exchange> exchanges = createExchanges(response.getMessages());
return processBatch(CastUtils.cast(exchanges));
}
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
index 492b82fb469..c97e35d048d 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
@@ -75,6 +75,10 @@ public class SplunkConsumer extends ScheduledBatchPollingConsumer {
return 0;
} else {
List<SplunkEvent> events = dataReader.read();
+
+ // okay we have some response from splunk so lets mark the consumer as ready
+ forceConsumerAsReady();
+
Queue<Exchange> exchanges = createExchanges(events);
return processBatch(CastUtils.cast(exchanges));
}
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
index 81ad935080b..bf18b64d4bc 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlConsumer.java
@@ -178,6 +178,9 @@ public class SqlConsumer extends ScheduledBatchPollingConsumer {
}
}
+ // okay we have some response from SQL so lets mark the consumer as ready
+ forceConsumerAsReady();
+
// process all the exchanges in this batch
try {
if (answer.isEmpty()) {
diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java
index 77aa84b89f7..71ee4463672 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java
@@ -93,6 +93,9 @@ public interface ManagedSchedulePollConsumerMBean extends ManagedConsumerMBean {
@ManagedAttribute(description = "Whether a first pool attempt has been done (also if the consumer has been restarted)")
boolean isFirstPollDone();
+ @ManagedAttribute(description = "Whether the consumer is ready to handle incoming traffic (used for readiness health-check)")
+ boolean isConsumerReady();
+
@ManagedAttribute(description = "Total number of polls run")
long getCounter();
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java
index 7c39603de50..ed0a1b5267e 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java
@@ -158,6 +158,11 @@ public class ManagedScheduledPollConsumer extends ManagedConsumer implements Man
return getConsumer().isFirstPollDone();
}
+ @Override
+ public boolean isConsumerReady() {
+ return getConsumer().isConsumerReady();
+ }
+
@Override
public long getCounter() {
return getConsumer().getCounter();
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
index 86337c9979c..7081fa6c7f5 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
@@ -80,6 +80,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer
private volatile Map<String, Object> lastErrorDetails;
private final AtomicLong counter = new AtomicLong();
private volatile boolean firstPollDone;
+ private volatile boolean forceReady;
public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -480,15 +481,40 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer
}
/**
- * Whether a first pool attempt has been done (also if the consumer has been restarted)
+ * Whether a first pool attempt has been done (also if the consumer has been restarted).
*/
public boolean isFirstPollDone() {
return firstPollDone;
}
+ /**
+ * Whether the consumer is ready and has established connection to its target system, or first poll has been
+ * completed successfully.
+ *
+ * The health-check is using this information to know when the consumer is ready for readiness checks.
+ */
+ public boolean isConsumerReady() {
+ // we regard the consumer as ready if it was explicit forced to be ready (component specific)
+ // or that it has completed its first poll without an exception was thrown
+ // during connecting to target system and accepting data
+ return forceReady || firstPollDone;
+ }
+
// Implementation methods
// -------------------------------------------------------------------------
+ /**
+ * Forces the consumer to be marked as ready. This can be used by components that need to mark this sooner than
+ * usual (default marked as ready after first poll is done). This allows health-checks to be ready before an entire
+ * poll is completed.
+ *
+ * This is for example needed by the FTP component as polling a large file can take long time, causing a
+ * health-check to not be ready within reasonable time.
+ */
+ protected void forceConsumerAsReady() {
+ forceReady = true;
+ }
+
/**
* Gets the last caused error (exception) for the last poll that failed. When the consumer is successfully again,
* then the error resets to null.
@@ -672,7 +698,9 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer
errorCounter = 0;
successCounter = 0;
counter.set(0);
+ // clear ready state
firstPollDone = false;
+ forceReady = false;
super.doStop();
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java
index 69563f260bc..6565156bad4 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java
@@ -83,13 +83,13 @@ public class ScheduledPollConsumerHealthCheck implements HealthCheck {
}
long ec = consumer.getErrorCounter();
- boolean first = consumer.isFirstPollDone();
+ boolean ready = consumer.isConsumerReady();
Throwable cause = consumer.getLastError();
boolean healthy = ec == 0;
boolean readiness = kind.equals(Kind.READINESS);
- if (readiness && !first) {
- // special for readiness check before first poll is done
+ if (readiness && !ready) {
+ // special for readiness check before first poll is done or not yet ready
// if initial state is UP or UNKNOWN then return that
// otherwise we are DOWN
boolean down = builder.state().equals(State.DOWN);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/component/AbstractApiConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/component/AbstractApiConsumer.java
index 93b8b85ddb0..0837e4346bf 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/component/AbstractApiConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/component/AbstractApiConsumer.java
@@ -64,10 +64,12 @@ public abstract class AbstractApiConsumer<E extends Enum<E> & ApiName, T>
interceptProperties(args);
try {
-
Object result = doInvokeMethod(args);
- return ApiConsumerHelper.getResultsProcessed(this, result, isSplitResult());
+ // okay we have some response so lets mark the consumer as ready
+ forceConsumerAsReady();
+
+ return ApiConsumerHelper.getResultsProcessed(this, result, isSplitResult());
} catch (Exception t) {
throw RuntimeCamelException.wrapRuntimeCamelException(t);
}
diff --git a/docs/components/modules/ROOT/examples/json/kubernetes-cronjob.json b/docs/components/modules/ROOT/examples/json/kubernetes-cronjob.json
new file mode 120000
index 00000000000..05ff1d9b409
--- /dev/null
+++ b/docs/components/modules/ROOT/examples/json/kubernetes-cronjob.json
@@ -0,0 +1 @@
+../../../../../../components/camel-kubernetes/src/generated/resources/org/apache/camel/component/kubernetes/cronjob/kubernetes-cronjob.json
\ No newline at end of file
diff --git a/docs/components/modules/ROOT/nav.adoc b/docs/components/modules/ROOT/nav.adoc
index 83cb5f1c69d..0417295ecf9 100644
--- a/docs/components/modules/ROOT/nav.adoc
+++ b/docs/components/modules/ROOT/nav.adoc
@@ -187,6 +187,7 @@
** xref:knative-component.adoc[Knative]
** xref:kubernetes-summary.adoc[Kubernetes]
*** xref:kubernetes-config-maps-component.adoc[Kubernetes ConfigMap]
+*** xref:kubernetes-cronjob-component.adoc[Kubernetes Cronjob]
*** xref:kubernetes-custom-resources-component.adoc[Kubernetes Custom Resources]
*** xref:kubernetes-deployments-component.adoc[Kubernetes Deployments]
*** xref:kubernetes-events-component.adoc[Kubernetes Event]
diff --git a/docs/components/modules/ROOT/pages/kubernetes-cronjob-component.adoc b/docs/components/modules/ROOT/pages/kubernetes-cronjob-component.adoc
new file mode 120000
index 00000000000..d90d48d334c
--- /dev/null
+++ b/docs/components/modules/ROOT/pages/kubernetes-cronjob-component.adoc
@@ -0,0 +1 @@
+../../../../../components/camel-kubernetes/src/main/docs/kubernetes-cronjob-component.adoc
\ No newline at end of file
diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
index 7aed55f32b0..85deab6fd1c 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
@@ -45,6 +45,37 @@ from("seda:c")
.to("seda:d")
----
+=== Consumer health checks
+
+The scheduled consumers has been improved to mark the consumer as _ready_ sooner, when possible. Previously a consumer,
+would mark as ready after the first poll was completed. For example, a FTP consumer downloading a big file on first poll,
+could take soo long time, that the readiness check would timeout and fail during startup of your Camel application.
+
+The following components is now marking the consumer as ready sooner:
+
+- camel-aws
+- camel-azure
+- camel-box
+- camel-dhis2
+- camel-fhir
+- camel-couchbase
+- camel-ftp
+- camel-google
+- camel-ironmq
+- camel-jooq
+- camel-jpa
+- camel-mail
+- camel-minio
+- camel-mybatis
+- camel-olingo2
+- camel-olingo4
+- camel-slack
+- camel-splunk
+- camel-sql
+- camel-twilio
+- camel-zendesk
+
+
=== camel-management
If the `nodeIdPrefix` has been configured on routes, then the MBeans for the processors will now use the prefix
diff --git a/test-infra/camel-test-infra-hashicorp-vault/src/test/java/org/apache/camel/test/infra/hashicorp/vault/services/HashicorpVaultLocalContainerService.java b/test-infra/camel-test-infra-hashicorp-vault/src/test/java/org/apache/camel/test/infra/hashicorp/vault/services/HashicorpVaultLocalContainerService.java
index fa8d772be43..05fa0f8784e 100644
--- a/test-infra/camel-test-infra-hashicorp-vault/src/test/java/org/apache/camel/test/infra/hashicorp/vault/services/HashicorpVaultLocalContainerService.java
+++ b/test-infra/camel-test-infra-hashicorp-vault/src/test/java/org/apache/camel/test/infra/hashicorp/vault/services/HashicorpVaultLocalContainerService.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with