You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2020/07/27 09:52:49 UTC

[GitHub] [camel] omarsmak commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

omarsmak commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r460750413



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectStat;
+import io.minio.SetBucketPolicyArgs;
+import io.minio.StatObjectArgs;
+import io.minio.errors.InvalidBucketNameException;
+import org.apache.camel.Category;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.minio.client.MinioClientFactory;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store and retrieve objects from Minio Storage Service using Minio SDK.
+ */
+@UriEndpoint(firstVersion = "3.5.0", scheme = "minio", title = "Minio Storage Service", syntax = "minio://bucketName",
+        category = {Category.CLOUD, Category.FILE})
+
+public class MinioEndpoint extends ScheduledPollEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioEndpoint.class);
+
+    private MinioClient minioClient;
+
+    @UriPath(description = "Bucket name")
+    @Metadata(required = true)
+    private String bucketName; // to support component docs
+    @UriParam
+    private MinioConfiguration configuration;
+    @UriParam(label = "consumer", defaultValue = "10")
+    private int maxMessagesPerPoll = 10;
+    @UriParam(label = "consumer", defaultValue = "60")
+    private int maxConnections = 50 + maxMessagesPerPoll;
+
+    public MinioEndpoint(String uri, Component component, MinioConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        MinioConsumer minioConsumer = new MinioConsumer(this, processor);
+        configureConsumer(minioConsumer);
+        minioConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
+        return minioConsumer;
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new MinioProducer(this);
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        minioClient = getConfiguration().getMinioClient() != null
+                ? getConfiguration().getMinioClient()
+                : MinioClientFactory.getClient(getConfiguration()).getMinioClient();
+
+        String objectName = getConfiguration().getObjectName();
+
+        if (objectName != null) {
+            LOG.trace("Object name {} requested, so skipping bucket check...", objectName);
+            return;
+        }
+
+        String bucketName = getConfiguration().getBucketName();
+        LOG.trace("Querying whether bucket {} already exists...", bucketName);
+
+        if (bucketExists(bucketName)) {
+            LOG.trace("Bucket {} already exists", bucketName);
+        } else {
+            if (!getConfiguration().isAutoCreateBucket()) {
+                throw new InvalidBucketNameException("Bucket {} does not exists, set autoCreateBucket option for bucket auto creation", bucketName);
+            } else {
+                LOG.trace("AutoCreateBucket set to true, Creating bucket {}...", bucketName);
+                makeBucket(bucketName);
+                LOG.trace("Bucket created");
+            }
+        }
+
+        if (getConfiguration().getPolicy() != null) {
+            setBucketPolicy(bucketName);
+        }
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        if (ObjectHelper.isEmpty(getConfiguration().getMinioClient())) {
+            if (minioClient != null) {

Review comment:
       I don't see any reason to null the client here, isn't? 

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {

Review comment:
       Perhaps we can use `isNotEmpty` from `org.apache.camel.util.ObjectHelper` to check for nulls or emptyness

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConfiguration.java
##########
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.time.ZonedDateTime;
+
+import io.minio.MinioClient;
+import io.minio.ServerSideEncryption;
+import io.minio.ServerSideEncryptionCustomerKey;
+import okhttp3.OkHttpClient;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+@UriParams
+public class MinioConfiguration implements Cloneable {
+
+    @UriParam
+    private String endpoint;
+    @UriParam
+    private Integer proxyPort;
+
+    @UriParam(label = "security", secret = true)
+    private String accessKey;
+    @UriParam(label = "security", secret = true)
+    private String secretKey;
+    @UriParam(defaultValue = "false")
+    private boolean secure;
+
+    @UriParam

Review comment:
       labels are missing here, for example `producer`, `consumer`, `common` ..etc

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectStat;
+import io.minio.SetBucketPolicyArgs;
+import io.minio.StatObjectArgs;
+import io.minio.errors.InvalidBucketNameException;
+import org.apache.camel.Category;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.minio.client.MinioClientFactory;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store and retrieve objects from Minio Storage Service using Minio SDK.
+ */
+@UriEndpoint(firstVersion = "3.5.0", scheme = "minio", title = "Minio Storage Service", syntax = "minio://bucketName",
+        category = {Category.CLOUD, Category.FILE})
+
+public class MinioEndpoint extends ScheduledPollEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioEndpoint.class);
+
+    private MinioClient minioClient;
+
+    @UriPath(description = "Bucket name")
+    @Metadata(required = true)
+    private String bucketName; // to support component docs
+    @UriParam
+    private MinioConfiguration configuration;
+    @UriParam(label = "consumer", defaultValue = "10")
+    private int maxMessagesPerPoll = 10;
+    @UriParam(label = "consumer", defaultValue = "60")
+    private int maxConnections = 50 + maxMessagesPerPoll;
+
+    public MinioEndpoint(String uri, Component component, MinioConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        MinioConsumer minioConsumer = new MinioConsumer(this, processor);
+        configureConsumer(minioConsumer);
+        minioConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
+        return minioConsumer;
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new MinioProducer(this);
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        minioClient = getConfiguration().getMinioClient() != null
+                ? getConfiguration().getMinioClient()
+                : MinioClientFactory.getClient(getConfiguration()).getMinioClient();
+
+        String objectName = getConfiguration().getObjectName();
+
+        if (objectName != null) {
+            LOG.trace("Object name {} requested, so skipping bucket check...", objectName);
+            return;
+        }
+
+        String bucketName = getConfiguration().getBucketName();
+        LOG.trace("Querying whether bucket {} already exists...", bucketName);
+
+        if (bucketExists(bucketName)) {
+            LOG.trace("Bucket {} already exists", bucketName);
+        } else {
+            if (!getConfiguration().isAutoCreateBucket()) {
+                throw new InvalidBucketNameException("Bucket {} does not exists, set autoCreateBucket option for bucket auto creation", bucketName);
+            } else {
+                LOG.trace("AutoCreateBucket set to true, Creating bucket {}...", bucketName);
+                makeBucket(bucketName);
+                LOG.trace("Bucket created");
+            }
+        }
+
+        if (getConfiguration().getPolicy() != null) {
+            setBucketPolicy(bucketName);
+        }
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        if (ObjectHelper.isEmpty(getConfiguration().getMinioClient())) {
+            if (minioClient != null) {
+                minioClient = null;
+            }
+        }
+        super.doStop();
+    }
+
+    public Exchange createExchange(InputStream minioObject, String objectName) throws Exception {
+        return createExchange(getExchangePattern(), minioObject, objectName);
+    }
+
+    public Exchange createExchange(ExchangePattern pattern,
+                                   InputStream minioObject, String objectName) throws Exception {
+        LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName());
+
+        Exchange exchange = super.createExchange(pattern);
+        Message message = exchange.getIn();
+        LOG.trace("Got object!");
+
+        getObjectStat(objectName, message);
+
+        if (getConfiguration().isIncludeBody()) {
+            try {
+                message.setBody(readInputStream(minioObject));
+                if (getConfiguration().isAutocloseBody()) {
+                    exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+                        @Override
+                        public void onDone(Exchange exchange) {
+                            IOHelper.close(minioObject);
+                        }
+                    });
+                }
+
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        } else {
+            message.setBody(null);
+            IOHelper.close(minioObject);
+        }
+
+        return exchange;
+    }
+
+    public MinioConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(MinioConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public MinioClient getMinioClient() {
+        return minioClient;
+    }
+
+    public void setMinioClient(MinioClient minioClient) {
+        this.minioClient = minioClient;
+    }
+
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    /**
+     * Gets the maximum number of messages as a limit to poll at each polling.
+     * <p/>
+     * Gets the maximum number of messages as a limit to poll at each polling.
+     * The default value is 10. Use 0 or a negative number to set it as
+     * unlimited.
+     */
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
+    public int getMaxConnections() {
+        return maxConnections;
+    }
+
+    /**
+     * Set the maxConnections parameter in the minio client configuration
+     */
+    public void setMaxConnections(int maxConnections) {
+        this.maxConnections = maxConnections;
+    }
+
+    private String readInputStream(InputStream minioObject) throws IOException {
+        StringBuilder textBuilder = new StringBuilder();
+        try (Reader reader = new BufferedReader(new InputStreamReader(minioObject, Charset.forName(StandardCharsets.UTF_8.name())))) {
+            int c;
+            while ((c = reader.read()) != -1) {
+                textBuilder.append((char) c);
+            }
+        }
+        return textBuilder.toString();
+    }
+
+    private boolean bucketExists(String bucketName) throws Exception {
+        return minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
+    }
+
+    private void makeBucket(String bucketName) throws Exception {
+        MakeBucketArgs.Builder makeBucketRequest = MakeBucketArgs.builder().bucket(bucketName).objectLock(getConfiguration().isObjectLock());
+        if (getConfiguration().getRegion() != null) {
+            makeBucketRequest.region(getConfiguration().getRegion());
+        }
+        minioClient.makeBucket(makeBucketRequest.build());
+    }
+
+    private void setBucketPolicy(String bucketName) throws Exception {
+        LOG.trace("Updating bucket {} with policy...", bucketName);
+        minioClient.setBucketPolicy(
+                SetBucketPolicyArgs.builder().bucket(bucketName).config(getConfiguration().getPolicy()).build());
+        LOG.trace("Bucket policy updated");
+    }
+
+    private void getObjectStat(String objectName, Message message) throws Exception {
+
+        String bucketName = getConfiguration().getBucketName();
+        StatObjectArgs.Builder statObjectRequest = StatObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            statObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {

Review comment:
       These checks are pretty redundant. Perhaps what we can do here, wrap these into centralized lambdas somewhere and just pass the function reference. Example:
   ```
   private void checkMatchTagConfig(final MinioConfiguration configuration, final java.util.function.Consumer<String> fn) {
           if ( ObjectHelper.isNotEmpty(configuration.getMatchETag()) ) {
               fn.accept(configuration.getMatchETag());
           }
       }
   ```  
   And then somewhere in the code:
   ```
   checkMatchTagConfig(getConfiguration(), statObjectRequest::matchETag);
   ```

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectStat;
+import io.minio.SetBucketPolicyArgs;
+import io.minio.StatObjectArgs;
+import io.minio.errors.InvalidBucketNameException;
+import org.apache.camel.Category;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.minio.client.MinioClientFactory;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store and retrieve objects from Minio Storage Service using Minio SDK.
+ */
+@UriEndpoint(firstVersion = "3.5.0", scheme = "minio", title = "Minio Storage Service", syntax = "minio://bucketName",
+        category = {Category.CLOUD, Category.FILE})
+
+public class MinioEndpoint extends ScheduledPollEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioEndpoint.class);
+
+    private MinioClient minioClient;
+
+    @UriPath(description = "Bucket name")
+    @Metadata(required = true)
+    private String bucketName; // to support component docs
+    @UriParam
+    private MinioConfiguration configuration;
+    @UriParam(label = "consumer", defaultValue = "10")
+    private int maxMessagesPerPoll = 10;
+    @UriParam(label = "consumer", defaultValue = "60")
+    private int maxConnections = 50 + maxMessagesPerPoll;
+
+    public MinioEndpoint(String uri, Component component, MinioConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        MinioConsumer minioConsumer = new MinioConsumer(this, processor);
+        configureConsumer(minioConsumer);
+        minioConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
+        return minioConsumer;
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new MinioProducer(this);
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        minioClient = getConfiguration().getMinioClient() != null
+                ? getConfiguration().getMinioClient()
+                : MinioClientFactory.getClient(getConfiguration()).getMinioClient();
+
+        String objectName = getConfiguration().getObjectName();
+
+        if (objectName != null) {
+            LOG.trace("Object name {} requested, so skipping bucket check...", objectName);
+            return;
+        }
+
+        String bucketName = getConfiguration().getBucketName();
+        LOG.trace("Querying whether bucket {} already exists...", bucketName);
+
+        if (bucketExists(bucketName)) {
+            LOG.trace("Bucket {} already exists", bucketName);
+        } else {
+            if (!getConfiguration().isAutoCreateBucket()) {
+                throw new InvalidBucketNameException("Bucket {} does not exists, set autoCreateBucket option for bucket auto creation", bucketName);
+            } else {
+                LOG.trace("AutoCreateBucket set to true, Creating bucket {}...", bucketName);
+                makeBucket(bucketName);
+                LOG.trace("Bucket created");
+            }
+        }
+
+        if (getConfiguration().getPolicy() != null) {
+            setBucketPolicy(bucketName);
+        }
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        if (ObjectHelper.isEmpty(getConfiguration().getMinioClient())) {
+            if (minioClient != null) {
+                minioClient = null;
+            }
+        }
+        super.doStop();
+    }
+
+    public Exchange createExchange(InputStream minioObject, String objectName) throws Exception {
+        return createExchange(getExchangePattern(), minioObject, objectName);
+    }
+
+    public Exchange createExchange(ExchangePattern pattern,
+                                   InputStream minioObject, String objectName) throws Exception {
+        LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName());
+
+        Exchange exchange = super.createExchange(pattern);
+        Message message = exchange.getIn();
+        LOG.trace("Got object!");
+
+        getObjectStat(objectName, message);
+
+        if (getConfiguration().isIncludeBody()) {
+            try {
+                message.setBody(readInputStream(minioObject));
+                if (getConfiguration().isAutocloseBody()) {
+                    exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+                        @Override
+                        public void onDone(Exchange exchange) {
+                            IOHelper.close(minioObject);
+                        }
+                    });
+                }
+
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();

Review comment:
       please don't print the stack trace here, instead handle the exception.

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveBucketArgs;
+import io.minio.RemoveObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Minio Simple Storage
+ */
+public class MinioProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class);
+
+    private transient String minioProducerToString;
+
+    public MinioProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        MinioOperations operation = determineOperation(exchange);
+        MinioClient minioClient = getEndpoint().getMinioClient();
+        if (ObjectHelper.isEmpty(operation)) {
+            putObject(minioClient, exchange);
+        } else {
+            switch (operation) {
+                case copyObject:
+                    copyObject(minioClient, exchange);
+                    break;
+                case deleteObject:
+                    deleteObject(minioClient, exchange);
+                    break;
+                case deleteObjects:
+                    deleteObjects(minioClient, exchange);
+                    break;
+                case listBuckets:
+                    listBuckets(minioClient, exchange);
+                    break;
+                case deleteBucket:
+                    deleteBucket(minioClient, exchange);
+                    break;
+                case listObjects:
+                    listObjects(minioClient, exchange);
+                    break;
+                case getObject:
+                    getObject(minioClient, exchange);
+                    break;
+                case getPartialObject:
+                    getPartialObject(minioClient, exchange);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported operation");
+            }
+        }
+    }
+
+    public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse putObjectResult = minioClient.putObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+                if (putObjectResult.versionId() != null) {
+                    message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+                }
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String objectName = determineObjectName(exchange);
+            Map<String, String> objectMetadata = determineMetadata(exchange);
+            Map<String, String> extraHeaders = determineExtraHeaders(exchange);
+
+            File filePayload = null;
+            InputStream inputStream;
+            ByteArrayOutputStream baos;
+            Object object = exchange.getIn().getMandatoryBody();
+
+            // Need to check if the message body is WrappedFile
+            if (object instanceof WrappedFile) {
+                object = ((WrappedFile<?>) object).getFile();
+            }
+            if (object instanceof File) {
+                filePayload = (File) object;
+                inputStream = new FileInputStream(filePayload);
+            } else {
+                inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+                if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
+                    if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                        LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory");
+                        baos = determineLengthInputStream(inputStream);
+                        objectMetadata.put("Content-Length", String.valueOf(baos.size()));
+                        inputStream = new ByteArrayInputStream(baos.toByteArray());
+                    } else {
+                        if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                            objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+                        }
+                    }
+                }
+            }
+            PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder()
+                    .stream(inputStream, inputStream.available(), -1)
+                    .bucket(bucketName)
+                    .object(objectName)
+                    .userMetadata(objectMetadata);
+
+            if (!extraHeaders.isEmpty()) {
+                putObjectRequest.extraHeaders(extraHeaders);
+            }
+
+            LOG.trace("Put object from exchange...");
+
+            ObjectWriteResponse putObjectResult = getEndpoint().getMinioClient().putObject(putObjectRequest.build());
+
+            LOG.trace("Received result...");
+
+            Message message = getMessageForResponse(exchange);
+            message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+            if (putObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+            }
+
+            IOHelper.close(inputStream);
+
+            if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+                FileUtil.deleteFile(filePayload);
+            }
+        }
+    }
+
+    private Map<String, String> determineExtraHeaders(Exchange exchange) {
+        Map<String, String> extraHeaders = new HashMap<>();
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            extraHeaders.put("X-Amz-Storage-Class", storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(MinioConstants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            extraHeaders.put("x-amz-acl", cannedAcl);
+        }
+
+        return extraHeaders;
+    }
+
+    private void copyObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            CopyObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(CopyObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse result = minioClient.copyObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            }
+        } else {
+
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String destinationKey = exchange.getIn().getHeader(MinioConstants.DESTINATION_OBJECT_NAME, String.class);
+            final String destinationBucketName = exchange.getIn().getHeader(MinioConstants.DESTINATION_BUCKET_NAME, String.class);
+
+            if (ObjectHelper.isEmpty(destinationBucketName)) {
+                throw new IllegalArgumentException("Bucket Name Destination must be specified for copyObject Operation");
+            }
+            if (ObjectHelper.isEmpty(destinationKey)) {
+                throw new IllegalArgumentException("Destination Key must be specified for copyObject Operation");
+            }
+
+            CopySource.Builder copySourceBuilder = CopySource.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey);
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .bucket(destinationBucketName)
+                    .object(destinationKey)
+                    .source(copySourceBuilder.build());
+
+            ObjectWriteResponse copyObjectResult = minioClient.copyObject(copyObjectRequest.build());
+
+            Message message = getMessageForResponse(exchange);
+            if (copyObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, copyObjectResult.versionId());
+            }
+        }
+    }
+
+    private void deleteObject(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+        final String sourceKey = determineObjectName(exchange);
+        final String versionId = determineVersionId(exchange);

Review comment:
       `versionId` is not used here

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/client/GetMinioClient.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.client;
+
+import io.minio.MinioClient;
+import org.apache.camel.component.minio.MinioConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates MinIO client object according to the
+ * given endpoint, port, access key, secret key, region and secure option.
+ */
+public class GetMinioClient implements MinioCamelInternalClient {
+    private static final Logger LOG = LoggerFactory.getLogger(GetMinioClient.class);
+    private final MinioConfiguration configuration;
+
+    /**
+     * Constructor that uses the config file.
+     */
+    public GetMinioClient(MinioConfiguration configuration) {
+        LOG.trace("Creating an Minio client.");
+        this.configuration = configuration;
+    }
+
+    /**
+     * Getting the minio client.
+     *
+     * @return Minio Client.
+     */
+    @Override
+    public MinioClient getMinioClient() {
+        if (configuration.getEndpoint() != null) {
+            MinioClient.Builder minioClientRequest = MinioClient.builder();
+
+            if (configuration.getProxyPort() != null) {
+                minioClientRequest.endpoint(configuration.getEndpoint(), configuration.getProxyPort(), configuration.isSecure());
+            } else {
+                minioClientRequest.endpoint(configuration.getEndpoint());
+            }
+            if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {

Review comment:
       I wonder accessKeys, same as region are not required?

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveBucketArgs;
+import io.minio.RemoveObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Minio Simple Storage
+ */
+public class MinioProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class);
+
+    private transient String minioProducerToString;
+
+    public MinioProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        MinioOperations operation = determineOperation(exchange);
+        MinioClient minioClient = getEndpoint().getMinioClient();
+        if (ObjectHelper.isEmpty(operation)) {
+            putObject(minioClient, exchange);
+        } else {
+            switch (operation) {
+                case copyObject:
+                    copyObject(minioClient, exchange);
+                    break;
+                case deleteObject:
+                    deleteObject(minioClient, exchange);
+                    break;
+                case deleteObjects:
+                    deleteObjects(minioClient, exchange);
+                    break;
+                case listBuckets:
+                    listBuckets(minioClient, exchange);
+                    break;
+                case deleteBucket:
+                    deleteBucket(minioClient, exchange);
+                    break;
+                case listObjects:
+                    listObjects(minioClient, exchange);
+                    break;
+                case getObject:
+                    getObject(minioClient, exchange);
+                    break;
+                case getPartialObject:
+                    getPartialObject(minioClient, exchange);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported operation");
+            }
+        }
+    }
+
+    public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse putObjectResult = minioClient.putObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+                if (putObjectResult.versionId() != null) {
+                    message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+                }
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String objectName = determineObjectName(exchange);
+            Map<String, String> objectMetadata = determineMetadata(exchange);
+            Map<String, String> extraHeaders = determineExtraHeaders(exchange);
+
+            File filePayload = null;
+            InputStream inputStream;
+            ByteArrayOutputStream baos;
+            Object object = exchange.getIn().getMandatoryBody();
+
+            // Need to check if the message body is WrappedFile
+            if (object instanceof WrappedFile) {
+                object = ((WrappedFile<?>) object).getFile();
+            }
+            if (object instanceof File) {
+                filePayload = (File) object;
+                inputStream = new FileInputStream(filePayload);
+            } else {
+                inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+                if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
+                    if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                        LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory");
+                        baos = determineLengthInputStream(inputStream);
+                        objectMetadata.put("Content-Length", String.valueOf(baos.size()));
+                        inputStream = new ByteArrayInputStream(baos.toByteArray());
+                    } else {
+                        if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                            objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+                        }
+                    }
+                }
+            }
+            PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder()
+                    .stream(inputStream, inputStream.available(), -1)
+                    .bucket(bucketName)
+                    .object(objectName)
+                    .userMetadata(objectMetadata);
+
+            if (!extraHeaders.isEmpty()) {
+                putObjectRequest.extraHeaders(extraHeaders);
+            }
+
+            LOG.trace("Put object from exchange...");
+
+            ObjectWriteResponse putObjectResult = getEndpoint().getMinioClient().putObject(putObjectRequest.build());
+
+            LOG.trace("Received result...");
+
+            Message message = getMessageForResponse(exchange);
+            message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+            if (putObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+            }
+
+            IOHelper.close(inputStream);
+
+            if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+                FileUtil.deleteFile(filePayload);
+            }
+        }
+    }
+
+    private Map<String, String> determineExtraHeaders(Exchange exchange) {
+        Map<String, String> extraHeaders = new HashMap<>();
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            extraHeaders.put("X-Amz-Storage-Class", storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(MinioConstants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            extraHeaders.put("x-amz-acl", cannedAcl);
+        }
+
+        return extraHeaders;
+    }
+
+    private void copyObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            CopyObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(CopyObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse result = minioClient.copyObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            }
+        } else {
+
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String destinationKey = exchange.getIn().getHeader(MinioConstants.DESTINATION_OBJECT_NAME, String.class);
+            final String destinationBucketName = exchange.getIn().getHeader(MinioConstants.DESTINATION_BUCKET_NAME, String.class);
+
+            if (ObjectHelper.isEmpty(destinationBucketName)) {
+                throw new IllegalArgumentException("Bucket Name Destination must be specified for copyObject Operation");
+            }
+            if (ObjectHelper.isEmpty(destinationKey)) {
+                throw new IllegalArgumentException("Destination Key must be specified for copyObject Operation");
+            }
+
+            CopySource.Builder copySourceBuilder = CopySource.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey);
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .bucket(destinationBucketName)
+                    .object(destinationKey)
+                    .source(copySourceBuilder.build());
+
+            ObjectWriteResponse copyObjectResult = minioClient.copyObject(copyObjectRequest.build());
+
+            Message message = getMessageForResponse(exchange);
+            if (copyObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, copyObjectResult.versionId());
+            }
+        }
+    }
+
+    private void deleteObject(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+        final String sourceKey = determineObjectName(exchange);
+        final String versionId = determineVersionId(exchange);
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+
+            minioClient.removeObject(RemoveObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey).build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(true);
+        }
+    }
+
+    private void deleteObjects(MinioClient minioClient, Exchange exchange) throws Exception {
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectsArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+            throw new IllegalArgumentException("Cannot delete multiple objects without a POJO request");
+        }
+    }
+
+    private void listBuckets(MinioClient minioClient, Exchange exchange) throws Exception {
+        List<Bucket> bucketsList = minioClient.listBuckets();
+        Message message = getMessageForResponse(exchange);
+        //returns iterator of bucketList
+        message.setBody(bucketsList.iterator());
+    }
+
+    private void deleteBucket(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+
+        if (getConfiguration().isPojoRequest()) {
+            RemoveBucketArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveBucketArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeBucket(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody("ok");
+            }
+        } else {
+
+            minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
+            Message message = getMessageForResponse(exchange);
+            message.setBody("ok");
+        }
+    }
+
+    private void getObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void getPartialObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String offset = exchange.getIn().getHeader(MinioConstants.OFFSET, String.class);
+            final String length = exchange.getIn().getHeader(MinioConstants.LENGTH, String.class);
+
+            if (ObjectHelper.isEmpty(offset) || ObjectHelper.isEmpty(length)) {
+                throw new IllegalArgumentException("A Offset and length header must be configured to perform a partial get operation.");
+            }
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .offset(Long.parseLong(offset))
+                    .length(Long.parseLong(length))
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void listObjects(MinioClient minioClient, Exchange exchange) throws InvalidPayloadException {
+
+        if (getConfiguration().isPojoRequest()) {
+            ListObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(ListObjectsArgs.Builder.class);
+            if (payload != null) {
+                Iterable<Result<Item>> objectList = minioClient.listObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(objectList);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+
+            Iterable<Result<Item>> objectList = minioClient.listObjects(ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .recursive(getConfiguration().isRecursive())
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(objectList);
+        }
+    }
+
+    private MinioOperations determineOperation(Exchange exchange) {
+        MinioOperations operation = exchange.getIn().getHeader(MinioConstants.MINIO_OPERATION, MinioOperations.class);
+        if (operation == null) {
+            operation = getConfiguration().getOperation();
+        }
+        return operation;
+    }
+
+    private Map<String, String> determineMetadata(final Exchange exchange) {
+        Map<String, String> objectMetadata = new HashMap<>();
+
+        Long contentLength = exchange.getIn().getHeader(MinioConstants.CONTENT_LENGTH, Long.class);
+        if (contentLength != null) {
+            objectMetadata.put("Content-Length", String.valueOf(contentLength));
+        }
+
+        String contentType = exchange.getIn().getHeader(MinioConstants.CONTENT_TYPE, String.class);
+        if (contentType != null) {
+            objectMetadata.put("Content-Type", contentType);
+        }
+
+        String cacheControl = exchange.getIn().getHeader(MinioConstants.CACHE_CONTROL, String.class);
+        if (cacheControl != null) {
+            objectMetadata.put("Cache-Control", cacheControl);
+        }
+
+        String contentDisposition = exchange.getIn().getHeader(MinioConstants.CONTENT_DISPOSITION, String.class);
+        if (contentDisposition != null) {
+            objectMetadata.put("Content-Disposition", contentDisposition);
+        }
+
+        String contentEncoding = exchange.getIn().getHeader(MinioConstants.CONTENT_ENCODING, String.class);
+        if (contentEncoding != null) {
+            objectMetadata.put("Content-Encoding", contentEncoding);
+        }
+
+        String contentMD5 = exchange.getIn().getHeader(MinioConstants.CONTENT_MD5, String.class);
+        if (contentMD5 != null) {
+            objectMetadata.put("Content-Md5", contentMD5);
+        }
+
+        return objectMetadata;
+    }
+
+    /**
+     * Reads the bucket name from the header of the given exchange. If not
+     * provided, it's read from the endpoint configuration.
+     *
+     * @param exchange The exchange to read the header from.
+     * @return The bucket name.
+     * @throws IllegalArgumentException if the header could not be determined.
+     */
+    private String determineBucketName(final Exchange exchange) {
+        String bucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+
+        if (ObjectHelper.isEmpty(bucketName)) {
+            bucketName = getConfiguration().getBucketName();
+            LOG.trace("Minio Bucket name header is missing, using default one [{}]", bucketName);
+        }
+
+        if (bucketName == null) {
+            throw new IllegalArgumentException("Minio Bucket name header is missing or not configured.");
+        }
+
+        return bucketName;
+    }
+
+    private String determineObjectName(final Exchange exchange) {
+        String objectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+        if (ObjectHelper.isEmpty(objectName)) {
+            objectName = getConfiguration().getKeyName();
+        }
+        if (objectName == null) {
+            throw new IllegalArgumentException("Minio Key header is missing.");
+        }
+        return objectName;
+    }
+
+    private String determineStorageClass(final Exchange exchange) {
+        String storageClass = exchange.getIn().getHeader(MinioConstants.STORAGE_CLASS, String.class);
+        if (storageClass == null) {
+            storageClass = getConfiguration().getStorageClass();
+        }
+
+        return storageClass;
+    }
+
+    private String determineVersionId(final Exchange exchange) {
+        String versionId = exchange.getIn().getHeader(MinioConstants.VERSION_ID, String.class);
+        if (versionId == null) {
+            versionId = getConfiguration().getVersionId();
+        }
+
+        return versionId;
+    }
+
+    private ByteArrayOutputStream determineLengthInputStream(InputStream inputStream) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        byte[] bytes = new byte[1024];

Review comment:
       why is it `1024` bits here? Can you please move this into constant and with some comment why you opted for `1024`bits?

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveBucketArgs;
+import io.minio.RemoveObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Minio Simple Storage
+ */
+public class MinioProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class);
+
+    private transient String minioProducerToString;
+
+    public MinioProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        MinioOperations operation = determineOperation(exchange);
+        MinioClient minioClient = getEndpoint().getMinioClient();
+        if (ObjectHelper.isEmpty(operation)) {
+            putObject(minioClient, exchange);
+        } else {
+            switch (operation) {
+                case copyObject:
+                    copyObject(minioClient, exchange);
+                    break;
+                case deleteObject:
+                    deleteObject(minioClient, exchange);
+                    break;
+                case deleteObjects:
+                    deleteObjects(minioClient, exchange);
+                    break;
+                case listBuckets:
+                    listBuckets(minioClient, exchange);
+                    break;
+                case deleteBucket:
+                    deleteBucket(minioClient, exchange);
+                    break;
+                case listObjects:
+                    listObjects(minioClient, exchange);
+                    break;
+                case getObject:
+                    getObject(minioClient, exchange);
+                    break;
+                case getPartialObject:
+                    getPartialObject(minioClient, exchange);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported operation");
+            }
+        }
+    }
+
+    public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse putObjectResult = minioClient.putObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+                if (putObjectResult.versionId() != null) {
+                    message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+                }
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String objectName = determineObjectName(exchange);
+            Map<String, String> objectMetadata = determineMetadata(exchange);
+            Map<String, String> extraHeaders = determineExtraHeaders(exchange);
+
+            File filePayload = null;
+            InputStream inputStream;
+            ByteArrayOutputStream baos;
+            Object object = exchange.getIn().getMandatoryBody();
+
+            // Need to check if the message body is WrappedFile
+            if (object instanceof WrappedFile) {
+                object = ((WrappedFile<?>) object).getFile();
+            }
+            if (object instanceof File) {
+                filePayload = (File) object;
+                inputStream = new FileInputStream(filePayload);
+            } else {
+                inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+                if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
+                    if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                        LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory");
+                        baos = determineLengthInputStream(inputStream);
+                        objectMetadata.put("Content-Length", String.valueOf(baos.size()));
+                        inputStream = new ByteArrayInputStream(baos.toByteArray());
+                    } else {
+                        if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                            objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+                        }
+                    }
+                }
+            }
+            PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder()
+                    .stream(inputStream, inputStream.available(), -1)
+                    .bucket(bucketName)
+                    .object(objectName)
+                    .userMetadata(objectMetadata);
+
+            if (!extraHeaders.isEmpty()) {
+                putObjectRequest.extraHeaders(extraHeaders);
+            }
+
+            LOG.trace("Put object from exchange...");
+
+            ObjectWriteResponse putObjectResult = getEndpoint().getMinioClient().putObject(putObjectRequest.build());
+
+            LOG.trace("Received result...");
+
+            Message message = getMessageForResponse(exchange);
+            message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+            if (putObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+            }
+
+            IOHelper.close(inputStream);
+
+            if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+                FileUtil.deleteFile(filePayload);
+            }
+        }
+    }
+
+    private Map<String, String> determineExtraHeaders(Exchange exchange) {
+        Map<String, String> extraHeaders = new HashMap<>();
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            extraHeaders.put("X-Amz-Storage-Class", storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(MinioConstants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            extraHeaders.put("x-amz-acl", cannedAcl);
+        }
+
+        return extraHeaders;
+    }
+
+    private void copyObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            CopyObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(CopyObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse result = minioClient.copyObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            }
+        } else {
+
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String destinationKey = exchange.getIn().getHeader(MinioConstants.DESTINATION_OBJECT_NAME, String.class);
+            final String destinationBucketName = exchange.getIn().getHeader(MinioConstants.DESTINATION_BUCKET_NAME, String.class);
+
+            if (ObjectHelper.isEmpty(destinationBucketName)) {
+                throw new IllegalArgumentException("Bucket Name Destination must be specified for copyObject Operation");
+            }
+            if (ObjectHelper.isEmpty(destinationKey)) {
+                throw new IllegalArgumentException("Destination Key must be specified for copyObject Operation");
+            }
+
+            CopySource.Builder copySourceBuilder = CopySource.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey);
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .bucket(destinationBucketName)
+                    .object(destinationKey)
+                    .source(copySourceBuilder.build());
+
+            ObjectWriteResponse copyObjectResult = minioClient.copyObject(copyObjectRequest.build());
+
+            Message message = getMessageForResponse(exchange);
+            if (copyObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, copyObjectResult.versionId());
+            }
+        }
+    }
+
+    private void deleteObject(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+        final String sourceKey = determineObjectName(exchange);
+        final String versionId = determineVersionId(exchange);
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+
+            minioClient.removeObject(RemoveObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey).build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(true);
+        }
+    }
+
+    private void deleteObjects(MinioClient minioClient, Exchange exchange) throws Exception {
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectsArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+            throw new IllegalArgumentException("Cannot delete multiple objects without a POJO request");
+        }
+    }
+
+    private void listBuckets(MinioClient minioClient, Exchange exchange) throws Exception {
+        List<Bucket> bucketsList = minioClient.listBuckets();
+        Message message = getMessageForResponse(exchange);
+        //returns iterator of bucketList
+        message.setBody(bucketsList.iterator());
+    }
+
+    private void deleteBucket(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+
+        if (getConfiguration().isPojoRequest()) {
+            RemoveBucketArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveBucketArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeBucket(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody("ok");
+            }
+        } else {
+
+            minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
+            Message message = getMessageForResponse(exchange);
+            message.setBody("ok");
+        }
+    }
+
+    private void getObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void getPartialObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String offset = exchange.getIn().getHeader(MinioConstants.OFFSET, String.class);
+            final String length = exchange.getIn().getHeader(MinioConstants.LENGTH, String.class);
+
+            if (ObjectHelper.isEmpty(offset) || ObjectHelper.isEmpty(length)) {
+                throw new IllegalArgumentException("A Offset and length header must be configured to perform a partial get operation.");
+            }
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .offset(Long.parseLong(offset))
+                    .length(Long.parseLong(length))
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void listObjects(MinioClient minioClient, Exchange exchange) throws InvalidPayloadException {
+
+        if (getConfiguration().isPojoRequest()) {
+            ListObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(ListObjectsArgs.Builder.class);
+            if (payload != null) {
+                Iterable<Result<Item>> objectList = minioClient.listObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(objectList);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+
+            Iterable<Result<Item>> objectList = minioClient.listObjects(ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .recursive(getConfiguration().isRecursive())
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(objectList);
+        }
+    }
+
+    private MinioOperations determineOperation(Exchange exchange) {
+        MinioOperations operation = exchange.getIn().getHeader(MinioConstants.MINIO_OPERATION, MinioOperations.class);
+        if (operation == null) {
+            operation = getConfiguration().getOperation();
+        }
+        return operation;
+    }
+
+    private Map<String, String> determineMetadata(final Exchange exchange) {
+        Map<String, String> objectMetadata = new HashMap<>();
+
+        Long contentLength = exchange.getIn().getHeader(MinioConstants.CONTENT_LENGTH, Long.class);
+        if (contentLength != null) {
+            objectMetadata.put("Content-Length", String.valueOf(contentLength));
+        }
+
+        String contentType = exchange.getIn().getHeader(MinioConstants.CONTENT_TYPE, String.class);
+        if (contentType != null) {
+            objectMetadata.put("Content-Type", contentType);
+        }
+
+        String cacheControl = exchange.getIn().getHeader(MinioConstants.CACHE_CONTROL, String.class);
+        if (cacheControl != null) {
+            objectMetadata.put("Cache-Control", cacheControl);
+        }
+
+        String contentDisposition = exchange.getIn().getHeader(MinioConstants.CONTENT_DISPOSITION, String.class);
+        if (contentDisposition != null) {
+            objectMetadata.put("Content-Disposition", contentDisposition);
+        }
+
+        String contentEncoding = exchange.getIn().getHeader(MinioConstants.CONTENT_ENCODING, String.class);
+        if (contentEncoding != null) {
+            objectMetadata.put("Content-Encoding", contentEncoding);
+        }
+
+        String contentMD5 = exchange.getIn().getHeader(MinioConstants.CONTENT_MD5, String.class);
+        if (contentMD5 != null) {
+            objectMetadata.put("Content-Md5", contentMD5);
+        }
+
+        return objectMetadata;
+    }
+
+    /**
+     * Reads the bucket name from the header of the given exchange. If not
+     * provided, it's read from the endpoint configuration.
+     *
+     * @param exchange The exchange to read the header from.
+     * @return The bucket name.
+     * @throws IllegalArgumentException if the header could not be determined.
+     */
+    private String determineBucketName(final Exchange exchange) {
+        String bucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+
+        if (ObjectHelper.isEmpty(bucketName)) {
+            bucketName = getConfiguration().getBucketName();
+            LOG.trace("Minio Bucket name header is missing, using default one [{}]", bucketName);
+        }
+
+        if (bucketName == null) {

Review comment:
       this check here is not needed. `ObjectHelper.isEmpty(bucketName)` will check if string is empty or null

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;

Review comment:
       Please avoid wildcard import

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");

Review comment:
       Should we handle this exception as well?

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {

Review comment:
       I am bit not comfortable of having check with not equal 0, odd cases like minus could slip. I'd recommend changing this to `> 0` instead. Same for the other checks

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/client/GetMinioClient.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.client;
+
+import io.minio.MinioClient;
+import org.apache.camel.component.minio.MinioConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates MinIO client object according to the
+ * given endpoint, port, access key, secret key, region and secure option.
+ */
+public class GetMinioClient implements MinioCamelInternalClient {

Review comment:
       This class name is weird for the client impl. If this the remote client from the factory, something like `MinioRemoteClientImpl` would make more sense

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");
+        }
+    }
+
+    private void copyObject(String srcBucketName, String srcObjectName) {
+        String destinationBucketName = getConfiguration().getDestinationBucketName();
+        if (destinationBucketName == null) {
+            throw new IllegalArgumentException("Destination Bucket name must be specified to copy operation");
+        }
+
+        try {
+            // set destination object name as source object name, if not specified
+            String destinationObjectName = (getConfiguration().getDestinationObjectName() != null)
+                    ? getConfiguration().getDestinationObjectName()
+                    : srcObjectName;
+
+
+            LOG.trace("Copying object from bucket {} with objectName {} to bucket {}...",
+                    srcBucketName, srcObjectName, destinationBucketName);
+
+            CopySource.Builder copySourceBuilder = CopySource.builder().bucket(srcBucketName).object(srcObjectName);
+            if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+                copySourceBuilder.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+            }
+            if (getConfiguration().getOffset() != 0) {
+                copySourceBuilder.offset(getConfiguration().getOffset());
+            }
+            if (getConfiguration().getLength() != 0) {
+                copySourceBuilder.length(getConfiguration().getLength());
+            }
+            if (getConfiguration().getVersionId() != null) {
+                copySourceBuilder.versionId(getConfiguration().getVersionId());
+            }
+            if (getConfiguration().getMatchETag() != null) {
+                copySourceBuilder.matchETag(getConfiguration().getMatchETag());
+            }
+            if (getConfiguration().getNotMatchETag() != null) {
+                copySourceBuilder.notMatchETag(getConfiguration().getNotMatchETag());
+            }
+            if (getConfiguration().getModifiedSince() != null) {
+                copySourceBuilder.modifiedSince(getConfiguration().getModifiedSince());
+            }
+            if (getConfiguration().getUnModifiedSince() != null) {
+                copySourceBuilder.unmodifiedSince(getConfiguration().getUnModifiedSince());
+            }
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .source(copySourceBuilder.build())
+                    .bucket(getConfiguration().getDestinationBucketName())
+                    .object(destinationObjectName);
+
+            if (getConfiguration().getServerSideEncryption() != null) {
+                copyObjectRequest.sse(getConfiguration().getServerSideEncryption());
+            }
+
+            getMinioClient().copyObject(copyObjectRequest.build());
+
+        } catch (Exception e) {
+            LOG.warn("Error copy object from bucket {} with objectName {} to bucket {}...",

Review comment:
       Same here, should handle this exception as well?

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveBucketArgs;
+import io.minio.RemoveObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Minio Simple Storage
+ */
+public class MinioProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class);
+
+    private transient String minioProducerToString;
+
+    public MinioProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        MinioOperations operation = determineOperation(exchange);
+        MinioClient minioClient = getEndpoint().getMinioClient();
+        if (ObjectHelper.isEmpty(operation)) {
+            putObject(minioClient, exchange);
+        } else {
+            switch (operation) {
+                case copyObject:
+                    copyObject(minioClient, exchange);
+                    break;
+                case deleteObject:
+                    deleteObject(minioClient, exchange);
+                    break;
+                case deleteObjects:
+                    deleteObjects(minioClient, exchange);
+                    break;
+                case listBuckets:
+                    listBuckets(minioClient, exchange);
+                    break;
+                case deleteBucket:
+                    deleteBucket(minioClient, exchange);
+                    break;
+                case listObjects:
+                    listObjects(minioClient, exchange);
+                    break;
+                case getObject:
+                    getObject(minioClient, exchange);
+                    break;
+                case getPartialObject:
+                    getPartialObject(minioClient, exchange);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported operation");
+            }
+        }
+    }
+
+    public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse putObjectResult = minioClient.putObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+                if (putObjectResult.versionId() != null) {
+                    message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+                }
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String objectName = determineObjectName(exchange);
+            Map<String, String> objectMetadata = determineMetadata(exchange);
+            Map<String, String> extraHeaders = determineExtraHeaders(exchange);
+
+            File filePayload = null;
+            InputStream inputStream;
+            ByteArrayOutputStream baos;
+            Object object = exchange.getIn().getMandatoryBody();
+
+            // Need to check if the message body is WrappedFile
+            if (object instanceof WrappedFile) {
+                object = ((WrappedFile<?>) object).getFile();
+            }
+            if (object instanceof File) {
+                filePayload = (File) object;
+                inputStream = new FileInputStream(filePayload);
+            } else {
+                inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+                if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
+                    if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                        LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory");
+                        baos = determineLengthInputStream(inputStream);
+                        objectMetadata.put("Content-Length", String.valueOf(baos.size()));
+                        inputStream = new ByteArrayInputStream(baos.toByteArray());
+                    } else {
+                        if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                            objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+                        }
+                    }
+                }
+            }
+            PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder()
+                    .stream(inputStream, inputStream.available(), -1)
+                    .bucket(bucketName)
+                    .object(objectName)
+                    .userMetadata(objectMetadata);
+
+            if (!extraHeaders.isEmpty()) {
+                putObjectRequest.extraHeaders(extraHeaders);
+            }
+
+            LOG.trace("Put object from exchange...");
+
+            ObjectWriteResponse putObjectResult = getEndpoint().getMinioClient().putObject(putObjectRequest.build());
+
+            LOG.trace("Received result...");
+
+            Message message = getMessageForResponse(exchange);
+            message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+            if (putObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+            }
+
+            IOHelper.close(inputStream);
+
+            if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+                FileUtil.deleteFile(filePayload);
+            }
+        }
+    }
+
+    private Map<String, String> determineExtraHeaders(Exchange exchange) {
+        Map<String, String> extraHeaders = new HashMap<>();
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            extraHeaders.put("X-Amz-Storage-Class", storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(MinioConstants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            extraHeaders.put("x-amz-acl", cannedAcl);
+        }
+
+        return extraHeaders;
+    }
+
+    private void copyObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            CopyObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(CopyObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse result = minioClient.copyObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            }
+        } else {
+
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String destinationKey = exchange.getIn().getHeader(MinioConstants.DESTINATION_OBJECT_NAME, String.class);
+            final String destinationBucketName = exchange.getIn().getHeader(MinioConstants.DESTINATION_BUCKET_NAME, String.class);
+
+            if (ObjectHelper.isEmpty(destinationBucketName)) {
+                throw new IllegalArgumentException("Bucket Name Destination must be specified for copyObject Operation");
+            }
+            if (ObjectHelper.isEmpty(destinationKey)) {
+                throw new IllegalArgumentException("Destination Key must be specified for copyObject Operation");
+            }
+
+            CopySource.Builder copySourceBuilder = CopySource.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey);
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .bucket(destinationBucketName)
+                    .object(destinationKey)
+                    .source(copySourceBuilder.build());
+
+            ObjectWriteResponse copyObjectResult = minioClient.copyObject(copyObjectRequest.build());
+
+            Message message = getMessageForResponse(exchange);
+            if (copyObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, copyObjectResult.versionId());
+            }
+        }
+    }
+
+    private void deleteObject(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+        final String sourceKey = determineObjectName(exchange);
+        final String versionId = determineVersionId(exchange);
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+
+            minioClient.removeObject(RemoveObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey).build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(true);
+        }
+    }
+
+    private void deleteObjects(MinioClient minioClient, Exchange exchange) throws Exception {
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectsArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+            throw new IllegalArgumentException("Cannot delete multiple objects without a POJO request");
+        }
+    }
+
+    private void listBuckets(MinioClient minioClient, Exchange exchange) throws Exception {
+        List<Bucket> bucketsList = minioClient.listBuckets();
+        Message message = getMessageForResponse(exchange);
+        //returns iterator of bucketList
+        message.setBody(bucketsList.iterator());
+    }
+
+    private void deleteBucket(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+
+        if (getConfiguration().isPojoRequest()) {
+            RemoveBucketArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveBucketArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeBucket(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody("ok");
+            }
+        } else {
+
+            minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
+            Message message = getMessageForResponse(exchange);
+            message.setBody("ok");
+        }
+    }
+
+    private void getObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void getPartialObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String offset = exchange.getIn().getHeader(MinioConstants.OFFSET, String.class);
+            final String length = exchange.getIn().getHeader(MinioConstants.LENGTH, String.class);
+
+            if (ObjectHelper.isEmpty(offset) || ObjectHelper.isEmpty(length)) {
+                throw new IllegalArgumentException("A Offset and length header must be configured to perform a partial get operation.");
+            }
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .offset(Long.parseLong(offset))
+                    .length(Long.parseLong(length))
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void listObjects(MinioClient minioClient, Exchange exchange) throws InvalidPayloadException {
+
+        if (getConfiguration().isPojoRequest()) {
+            ListObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(ListObjectsArgs.Builder.class);
+            if (payload != null) {
+                Iterable<Result<Item>> objectList = minioClient.listObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(objectList);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+
+            Iterable<Result<Item>> objectList = minioClient.listObjects(ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .recursive(getConfiguration().isRecursive())
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(objectList);
+        }
+    }
+
+    private MinioOperations determineOperation(Exchange exchange) {
+        MinioOperations operation = exchange.getIn().getHeader(MinioConstants.MINIO_OPERATION, MinioOperations.class);
+        if (operation == null) {
+            operation = getConfiguration().getOperation();
+        }
+        return operation;
+    }
+
+    private Map<String, String> determineMetadata(final Exchange exchange) {
+        Map<String, String> objectMetadata = new HashMap<>();
+
+        Long contentLength = exchange.getIn().getHeader(MinioConstants.CONTENT_LENGTH, Long.class);
+        if (contentLength != null) {
+            objectMetadata.put("Content-Length", String.valueOf(contentLength));
+        }
+
+        String contentType = exchange.getIn().getHeader(MinioConstants.CONTENT_TYPE, String.class);
+        if (contentType != null) {

Review comment:
       `isNotEmpty` as I mentioned earlier would make more sense here

##########
File path: components/camel-minio/src/test/java/org/apache/camel/component/minio/MinioComponentConfigurationTest.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MinioComponentConfigurationTest extends CamelTestSupport {
+
+    @Test
+    public void createEndpointWithMinimalConfiguration() throws Exception {
+        MinioComponent component = context.getComponent("minio", MinioComponent.class);
+        MinioEndpoint endpoint = (MinioEndpoint) component
+                .createEndpoint("minio://TestDomain?accessKey=xxx&secretKey=yyy&region=us-west-1&endpoint=http://localhost:4572");
+        assertEquals(endpoint.getConfiguration().getEndpoint(), "http://localhost:4572");

Review comment:
       Please add more cases for region, secretKey .. etc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org