You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/06/01 04:38:38 UTC
[camel] branch main updated: CAMEL-17100: minio consumer is slow at starting. Change the minio object loading at beginning to exchange by exchange (#7691)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 7de327d91eb CAMEL-17100: minio consumer is slow at starting. Change the minio object loading at beginning to exchange by exchange (#7691)
7de327d91eb is described below
commit 7de327d91eb6b238a4c937a31962932e9178c9dc
Author: Joan Bordeau <co...@gmail.com>
AuthorDate: Wed Jun 1 06:38:33 2022 +0200
CAMEL-17100: minio consumer is slow at starting. Change the minio object loading at beginning to exchange by exchange (#7691)
Co-authored-by: jbordeau <jo...@cleyrop.com>
---
.../camel/component/minio/MinioConsumer.java | 65 ++++++++++------------
1 file changed, 28 insertions(+), 37 deletions(-)
diff --git a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
index f82d5fd3385..562d499a574 100644
--- a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
+++ b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
@@ -20,8 +20,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
@@ -123,14 +121,12 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
String bucketName = getConfiguration().getBucketName();
String objectName = getConfiguration().getObjectName();
- MinioClient minioClient = getMinioClient();
Queue<Exchange> exchanges;
if (isNotEmpty(objectName)) {
LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
- InputStream minioObject = getObject(bucketName, minioClient, objectName);
- exchanges = createExchanges(minioObject, objectName);
+ exchanges = createExchanges(objectName);
return processBatch(CastUtils.cast(exchanges));
} else {
@@ -185,27 +181,22 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
}
}
- protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+ protected Queue<Exchange> createExchanges(String objectName) throws Exception {
Queue<Exchange> answer = new LinkedList<>();
- Exchange exchange = createExchange(objectStream, objectName);
+ Exchange exchange = createExchange(objectName);
answer.add(exchange);
- IOHelper.close(objectStream);
return answer;
}
protected Queue<Exchange> createExchanges(Iterator<Result<Item>> minioObjectSummaries) throws Exception {
int messageCounter = 0;
- String bucketName = getConfiguration().getBucketName();
- Collection<InputStream> minioObjects = new ArrayList<>();
Queue<Exchange> answer = new LinkedList<>();
try {
if (getConfiguration().isIncludeFolders()) {
do {
messageCounter++;
Item minioObjectSummary = minioObjectSummaries.next().get();
- InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
- minioObjects.add(minioObject);
- Exchange exchange = createExchange(minioObject, minioObjectSummary.objectName());
+ Exchange exchange = createExchange(minioObjectSummary.objectName());
answer.add(exchange);
continuationToken = minioObjectSummary.objectName();
} while (minioObjectSummaries.hasNext());
@@ -215,9 +206,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
Item minioObjectSummary = minioObjectSummaries.next().get();
// ignore if directory
if (!minioObjectSummary.isDir()) {
- InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
- minioObjects.add(minioObject);
- Exchange exchange = createExchange(minioObject, minioObjectSummary.objectName());
+ Exchange exchange = createExchange(minioObjectSummary.objectName());
answer.add(exchange);
continuationToken = minioObjectSummary.objectName();
}
@@ -233,10 +222,6 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
LOG.warn("Error getting MinioObject due: {}", e.getMessage());
throw e;
- } finally {
- // ensure all previous gathered minio objects are closed
- // if there was an exception creating the exchanges in this batch
- minioObjects.forEach(IOHelper::close);
}
return answer;
@@ -261,7 +246,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
}
@Override
- public int processBatch(Queue<Object> exchanges) {
+ public int processBatch(Queue<Object> exchanges) throws Exception {
int total = exchanges.size();
for (int index = 0; index < total && isBatchAllowed(); index++) {
@@ -275,6 +260,27 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
// update pending number of exchanges
pendingExchanges = total - index - 1;
+ String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+ String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+ if (getConfiguration().isIncludeBody()) {
+ InputStream minioObject;
+ try {
+ minioObject = getObject(srcBucketName, getMinioClient(), srcObjectName);
+ exchange.getIn().setBody(IOUtils.toByteArray(minioObject));
+ if (getConfiguration().isAutoCloseBody()) {
+ exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange exchange) {
+ IOHelper.close(minioObject);
+ }
+ });
+ }
+ } catch (Exception e) {
+ LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+ throw e;
+ }
+ }
+
// add on completion to handle after work when the exchange is done
exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
public void onComplete(Exchange exchange) {
@@ -405,7 +411,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
return (MinioEndpoint) super.getEndpoint();
}
- private Exchange createExchange(InputStream minioObject, String objectName) throws Exception {
+ private Exchange createExchange(String objectName) throws Exception {
LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName());
Exchange exchange = createExchange(true);
@@ -415,21 +421,6 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
getEndpoint().getObjectStat(objectName, message);
- if (getConfiguration().isIncludeBody()) {
- message.setBody(IOUtils.toByteArray(minioObject));
- if (getConfiguration().isAutoCloseBody()) {
- exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
- @Override
- public void onDone(Exchange exchange) {
- IOHelper.close(minioObject);
- }
- });
- }
- } else {
- message.setBody(null);
- IOHelper.close(minioObject);
- }
-
return exchange;
}