You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/06/01 04:41:05 UTC

[camel] branch camel-3.14.x updated: CAMEL-17100: minio consumer is slow at starting. Change the minio object loading at beginning to exchange by exchange (#7691)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.14.x by this push:
     new 6f85df125c6 CAMEL-17100: minio consumer is slow at starting. Change the minio object loading at beginning to exchange by exchange (#7691)
6f85df125c6 is described below

commit 6f85df125c6196388cdd0c4a3b030b1d582ba45c
Author: Joan Bordeau <co...@gmail.com>
AuthorDate: Wed Jun 1 06:38:33 2022 +0200

    CAMEL-17100: minio consumer is slow at starting. Change the minio object loading at beginning to exchange by exchange (#7691)
    
    Co-authored-by: jbordeau <jo...@cleyrop.com>
---
 .../camel/component/minio/MinioConsumer.java       | 65 ++++++++++------------
 1 file changed, 28 insertions(+), 37 deletions(-)

diff --git a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
index f82d5fd3385..562d499a574 100644
--- a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
+++ b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
@@ -20,8 +20,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.security.InvalidKeyException;
 import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -123,14 +121,12 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
 
         String bucketName = getConfiguration().getBucketName();
         String objectName = getConfiguration().getObjectName();
-        MinioClient minioClient = getMinioClient();
         Queue<Exchange> exchanges;
 
         if (isNotEmpty(objectName)) {
             LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
 
-            InputStream minioObject = getObject(bucketName, minioClient, objectName);
-            exchanges = createExchanges(minioObject, objectName);
+            exchanges = createExchanges(objectName);
             return processBatch(CastUtils.cast(exchanges));
 
         } else {
@@ -185,27 +181,22 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
         }
     }
 
-    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+    protected Queue<Exchange> createExchanges(String objectName) throws Exception {
         Queue<Exchange> answer = new LinkedList<>();
-        Exchange exchange = createExchange(objectStream, objectName);
+        Exchange exchange = createExchange(objectName);
         answer.add(exchange);
-        IOHelper.close(objectStream);
         return answer;
     }
 
     protected Queue<Exchange> createExchanges(Iterator<Result<Item>> minioObjectSummaries) throws Exception {
         int messageCounter = 0;
-        String bucketName = getConfiguration().getBucketName();
-        Collection<InputStream> minioObjects = new ArrayList<>();
         Queue<Exchange> answer = new LinkedList<>();
         try {
             if (getConfiguration().isIncludeFolders()) {
                 do {
                     messageCounter++;
                     Item minioObjectSummary = minioObjectSummaries.next().get();
-                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
-                    minioObjects.add(minioObject);
-                    Exchange exchange = createExchange(minioObject, minioObjectSummary.objectName());
+                    Exchange exchange = createExchange(minioObjectSummary.objectName());
                     answer.add(exchange);
                     continuationToken = minioObjectSummary.objectName();
                 } while (minioObjectSummaries.hasNext());
@@ -215,9 +206,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
                     Item minioObjectSummary = minioObjectSummaries.next().get();
                     // ignore if directory
                     if (!minioObjectSummary.isDir()) {
-                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
-                        minioObjects.add(minioObject);
-                        Exchange exchange = createExchange(minioObject, minioObjectSummary.objectName());
+                        Exchange exchange = createExchange(minioObjectSummary.objectName());
                         answer.add(exchange);
                         continuationToken = minioObjectSummary.objectName();
                     }
@@ -233,10 +222,6 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
             LOG.warn("Error getting MinioObject due: {}", e.getMessage());
             throw e;
 
-        } finally {
-            // ensure all previous gathered minio objects are closed
-            // if there was an exception creating the exchanges in this batch
-            minioObjects.forEach(IOHelper::close);
         }
 
         return answer;
@@ -261,7 +246,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
     }
 
     @Override
-    public int processBatch(Queue<Object> exchanges) {
+    public int processBatch(Queue<Object> exchanges) throws Exception {
         int total = exchanges.size();
 
         for (int index = 0; index < total && isBatchAllowed(); index++) {
@@ -275,6 +260,27 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
             // update pending number of exchanges
             pendingExchanges = total - index - 1;
 
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+            if (getConfiguration().isIncludeBody()) {
+                InputStream minioObject;
+                try {
+                    minioObject = getObject(srcBucketName, getMinioClient(), srcObjectName);
+                    exchange.getIn().setBody(IOUtils.toByteArray(minioObject));
+                    if (getConfiguration().isAutoCloseBody()) {
+                        exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+                            @Override
+                            public void onDone(Exchange exchange) {
+                                IOHelper.close(minioObject);
+                            }
+                        });
+                    }
+                } catch (Exception e) {
+                    LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+                    throw e;
+                }
+            }
+
             // add on completion to handle after work when the exchange is done
             exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
                 public void onComplete(Exchange exchange) {
@@ -405,7 +411,7 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
         return (MinioEndpoint) super.getEndpoint();
     }
 
-    private Exchange createExchange(InputStream minioObject, String objectName) throws Exception {
+    private Exchange createExchange(String objectName) throws Exception {
         LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName());
 
         Exchange exchange = createExchange(true);
@@ -415,21 +421,6 @@ public class MinioConsumer extends ScheduledBatchPollingConsumer {
 
         getEndpoint().getObjectStat(objectName, message);
 
-        if (getConfiguration().isIncludeBody()) {
-            message.setBody(IOUtils.toByteArray(minioObject));
-            if (getConfiguration().isAutoCloseBody()) {
-                exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
-                    @Override
-                    public void onDone(Exchange exchange) {
-                        IOHelper.close(minioObject);
-                    }
-                });
-            }
-        } else {
-            message.setBody(null);
-            IOHelper.close(minioObject);
-        }
-
         return exchange;
     }