You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/03/22 12:46:18 UTC

[camel] 01/01: Azure Data Lake - CAMEL-19152 additional file client header field + CAMEL-18636 additional auth options (#9567)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch azure-datalake-backport
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 80c649eecf559da0f01a56fef95d85e4b0f6a31d
Author: Andreas Klug <an...@de.bosch.com>
AuthorDate: Mon Mar 20 07:57:02 2023 +0100

    Azure Data Lake - CAMEL-19152 additional file client header field + CAMEL-18636 additional auth options (#9567)
    
    * CAMEL-19152 - Add file client header and flush option
    
    * CAMEL-18636: Allow string based auth + additional auth methods
    
    * CAMEL-18636: Add security label to config parameters
    
    * CAMEL-18636: Fixed setting SAS credentials from registry
    
    * camel-jbang - Fix NPE
    
    * CAMEL-19152 - Add file client header and flush option
    
    * CAMEL-18636: rebase - removed validate method from component
    
    * CAMEL-18636: Add security label to config parameters
    
    * CAMEL-18636: Fixed setting SAS credentials from registry
    
    ---------
    
    Co-authored-by: Andreas Klug (BD/XDE1) <KG...@fe-c-014an.fritz.box>
    Co-authored-by: Claus Ibsen <cl...@gmail.com>
---
 .../azure/storage/datalake/DataLakeComponent.java  |  9 ++--
 .../storage/datalake/DataLakeConfiguration.java    | 37 +++++++++++--
 .../DataLakeConfigurationOptionsProxy.java         |  9 ++++
 .../azure/storage/datalake/DataLakeConstants.java  |  4 ++
 .../storage/datalake/DataLakeExchangeHeaders.java  | 13 +++++
 .../datalake/client/DataLakeClientFactory.java     | 48 ++++++++++++++++-
 .../datalake/client/DataLakeFileClientWrapper.java |  5 +-
 .../operations/DataLakeDirectoryOperations.java    |  6 ++-
 .../operations/DataLakeFileOperations.java         | 63 ++++++++++++++--------
 .../operations/DataLakeFileOperationTest.java      | 28 ++++++++++
 10 files changed, 189 insertions(+), 33 deletions(-)

diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeComponent.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeComponent.java
index 7e6178a86b6..aae9545d02a 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeComponent.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeComponent.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.azure.storage.datalake;
 import java.util.Map;
 import java.util.Set;
 
+import com.azure.core.credential.AzureSasCredential;
 import com.azure.identity.ClientSecretCredential;
 import com.azure.storage.common.StorageSharedKeyCredential;
 import org.apache.camel.CamelContext;
@@ -85,16 +86,18 @@ public class DataLakeComponent extends DefaultComponent {
                     = getCamelContext().getRegistry().findByType(StorageSharedKeyCredential.class);
             final Set<ClientSecretCredential> clientSecretCredentials
                     = getCamelContext().getRegistry().findByType(ClientSecretCredential.class);
+            final Set<AzureSasCredential> sasCredentials
+                    = getCamelContext().getRegistry().findByType(AzureSasCredential.class);
 
             if (storageSharedKeyCredentials.size() == 1) {
                 configuration.setSharedKeyCredential(storageSharedKeyCredentials.stream().findFirst().get());
             }
-
             if (clientSecretCredentials.size() == 1) {
                 configuration.setClientSecretCredential(clientSecretCredentials.stream().findFirst().get());
             }
-
+            if (sasCredentials.size() == 1) {
+                configuration.setSasCredential(sasCredentials.stream().findFirst().get());
+            }
         }
     }
-
 }
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfiguration.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfiguration.java
index b38a36d94f4..a77a20e2c3b 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfiguration.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfiguration.java
@@ -20,6 +20,7 @@ import java.nio.file.OpenOption;
 import java.time.Duration;
 import java.util.Set;
 
+import com.azure.core.credential.AzureSasCredential;
 import com.azure.identity.ClientSecretCredential;
 import com.azure.storage.common.StorageSharedKeyCredential;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
@@ -42,16 +43,16 @@ public class DataLakeConfiguration implements Cloneable {
     private String directoryName;
     @UriParam(description = "name of file to be handled in component")
     private String fileName;
-    @UriParam(description = "client secret credential for authentication")
+    @UriParam(label = "security", secret = true, description = "client secret credential for authentication")
     private ClientSecretCredential clientSecretCredential;
     @UriParam(description = "datalake service client for azure storage datalake")
     @Metadata(autowired = true)
     private DataLakeServiceClient serviceClient;
-    @UriParam(description = "account key for authentication")
+    @UriParam(label = "security", secret = true, description = "account key for authentication")
     private String accountKey;
     @UriParam(description = "client id for azure account")
     private String clientId;
-    @UriParam(description = "client secret for azure account")
+    @UriParam(label = "security", secret = true, description = "client secret for azure account")
     private String clientSecret;
     @UriParam(description = "tenant id for azure account")
     private String tenantId;
@@ -93,6 +94,12 @@ public class DataLakeConfiguration implements Cloneable {
     private String umask;
     @UriParam(description = "set open options for creating file")
     private Set<OpenOption> openOptions;
+    @UriParam(label = "security", secret = true, description = "SAS token signature")
+    private String sasSignature;
+    @UriParam(label = "security", secret = true, description = "SAS token credential")
+    private AzureSasCredential sasCredential;
+    @UriParam(label = "security", secret = false, description = "Use default identity")
+    private Boolean useDefaultIdentity = false;
 
     @UriParam(label = "producer", enums = "listFileSystem, listFiles", defaultValue = "listFileSystem",
               description = "operation to be performed")
@@ -346,6 +353,30 @@ public class DataLakeConfiguration implements Cloneable {
         this.openOptions = openOptions;
     }
 
+    public String getSasSignature() {
+        return sasSignature;
+    }
+
+    public void setSasSignature(String sasSignature) {
+        this.sasSignature = sasSignature;
+    }
+
+    public AzureSasCredential getSasCredential() {
+        return sasCredential;
+    }
+
+    public void setSasCredential(AzureSasCredential sasCredential) {
+        this.sasCredential = sasCredential;
+    }
+
+    public Boolean getUseDefaultIdentity() {
+        return useDefaultIdentity;
+    }
+
+    public void setUseDefaultIdentity(Boolean useDefaultIdentity) {
+        this.useDefaultIdentity = useDefaultIdentity;
+    }
+
     public DataLakeConfiguration copy() {
         try {
             return (DataLakeConfiguration) super.clone();
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfigurationOptionsProxy.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfigurationOptionsProxy.java
index 4879b3dc9b0..492506a7232 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfigurationOptionsProxy.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfigurationOptionsProxy.java
@@ -25,6 +25,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import com.azure.storage.common.ParallelTransferOptions;
+import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.models.AccessTier;
 import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
 import com.azure.storage.file.datalake.models.FileQueryError;
@@ -117,6 +118,10 @@ public class DataLakeConfigurationOptionsProxy {
         return getOption(DataLakeExchangeHeaders::getLeaseIdFromHeaders, () -> null, exchange);
     }
 
+    public Boolean getFlush(final Exchange exchange) {
+        return getOption(DataLakeExchangeHeaders::getFlushFromHeaders, () -> Boolean.FALSE, exchange);
+    }
+
     public Boolean retainUnCommitedData(final Exchange exchange) {
         return getOption(DataLakeExchangeHeaders::getRetainUncommittedDataFromHeaders, configuration::getRetainUncommitedData,
                 exchange);
@@ -236,6 +241,10 @@ public class DataLakeConfigurationOptionsProxy {
         return DataLakeExchangeHeaders.getPathHttpHeadersFromHeaders(exchange);
     }
 
+    public DataLakeFileClient getFileClient(final Exchange exchange) {
+        return DataLakeExchangeHeaders.getFileClientFromHeaders(exchange);
+    }
+
     public int getMaxRetryRequests() {
         return configuration.getMaxRetryRequests();
     }
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConstants.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConstants.java
index 2d29ee6a018..8b1ced23421 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConstants.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConstants.java
@@ -204,6 +204,10 @@ public final class DataLakeConstants {
     public static final String PERMISSION = HEADER_PREFIX + "Permission";
     @Metadata(label = "from user", description = "Sets the umask for file.", javaType = "String")
     public static final String UMASK = HEADER_PREFIX + "Umask";
+    @Metadata(label = "from user", description = "Sets the file client to use", javaType = "DataLakeFileClient")
+    public static final String FILE_CLIENT = HEADER_PREFIX + "FileClient";
+    @Metadata(label = "from user", description = "Sets whether to flush on append", javaType = "Boolean")
+    public static final String FLUSH = HEADER_PREFIX + "Flush";
 
     private DataLakeConstants() {
     }
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java
index 591e34641e0..a6c890c1ab8 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java
@@ -26,6 +26,7 @@ import java.util.function.Consumer;
 
 import com.azure.core.http.HttpHeaders;
 import com.azure.storage.common.ParallelTransferOptions;
+import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.models.AccessTier;
 import com.azure.storage.file.datalake.models.ArchiveStatus;
 import com.azure.storage.file.datalake.models.CopyStatusType;
@@ -258,6 +259,14 @@ public class DataLakeExchangeHeaders {
         return getObjectFromHeaders(exchange, DataLakeConstants.UMASK, String.class);
     }
 
+    public static DataLakeFileClient getFileClientFromHeaders(final Exchange exchange) {
+        return getObjectFromHeaders(exchange, DataLakeConstants.FILE_CLIENT, DataLakeFileClient.class);
+    }
+
+    public static Boolean getFlushFromHeaders(final Exchange exchange) {
+        return getObjectFromHeaders(exchange, DataLakeConstants.FLUSH, Boolean.class);
+    }
+
     private static <T> T getObjectFromHeaders(final Exchange exchange, final String headerName, final Class<T> classType) {
         return ObjectHelper.isEmpty(exchange) ? null : exchange.getIn().getHeader(headerName, classType);
     }
@@ -415,4 +424,8 @@ public class DataLakeExchangeHeaders {
         return this;
     }
 
+    public DataLakeExchangeHeaders fileClient(final DataLakeFileClient fileClient) {
+        headers.put(DataLakeConstants.FILE_CLIENT, fileClient);
+        return this;
+    }
 }
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeClientFactory.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeClientFactory.java
index ab6da91347b..47dbda393e3 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeClientFactory.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeClientFactory.java
@@ -18,36 +18,51 @@ package org.apache.camel.component.azure.storage.datalake.client;
 
 import java.util.Locale;
 
+import com.azure.core.credential.AzureSasCredential;
 import com.azure.identity.ClientSecretCredential;
 import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.identity.DefaultAzureCredentialBuilder;
 import com.azure.storage.common.StorageSharedKeyCredential;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
 import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
 import org.apache.camel.component.azure.storage.datalake.DataLakeConfiguration;
 import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class DataLakeClientFactory {
     private static final String SERVICE_URI_SEGMENT = ".dfs.core.windows.net";
 
+    private static final Logger LOG = LoggerFactory.getLogger(DataLakeClientFactory.class);
+
     private DataLakeClientFactory() {
     }
 
     public static DataLakeServiceClient createDataLakeServiceClient(final DataLakeConfiguration configuration) {
         final DataLakeServiceClient client;
         if (configuration.getServiceClient() != null) {
+            LOG.trace("Using configured service client instance");
             client = configuration.getServiceClient();
+        } else if (configuration.getUseDefaultIdentity()) {
+            client = createDataLakeServiceClientWithDefaultIdentity(configuration);
         } else if (configuration.getAccountKey() != null || configuration.getSharedKeyCredential() != null) {
             client = createDataLakeServiceClientWithSharedKey(configuration);
+        } else if (configuration.getSasSignature() != null || configuration.getSasCredential() != null) {
+            client = createDataLakeServiceClientWithSas(configuration);
         } else {
             client = createDataLakeServiceClientWithClientSecret(configuration);
         }
+
         return client;
     }
 
     private static DataLakeServiceClient createDataLakeServiceClientWithSharedKey(final DataLakeConfiguration configuration) {
         StorageSharedKeyCredential sharedKeyCredential = configuration.getSharedKeyCredential();
         if (sharedKeyCredential == null) {
+            LOG.trace("Using account name and account key to instantiate service client");
             sharedKeyCredential = new StorageSharedKeyCredential(configuration.getAccountName(), configuration.getAccountKey());
+        } else {
+            LOG.trace("Using configured shared key instance to instantiate service client");
         }
 
         return new DataLakeServiceClientBuilder()
@@ -59,13 +74,15 @@ public final class DataLakeClientFactory {
     private static DataLakeServiceClient createDataLakeServiceClientWithClientSecret(
             final DataLakeConfiguration configuration) {
         ClientSecretCredential clientSecretCredential = configuration.getClientSecretCredential();
-
         if (clientSecretCredential == null) {
+            LOG.trace("Using client id, client secret, tenant id to instantiate service client");
             clientSecretCredential = new ClientSecretCredentialBuilder()
                     .clientId(configuration.getClientId())
                     .clientSecret(configuration.getClientSecret())
                     .tenantId(configuration.getTenantId())
                     .build();
+        } else {
+            LOG.trace("Using configured client secret instance to instantiate service client");
         }
 
         return new DataLakeServiceClientBuilder()
@@ -74,6 +91,35 @@ public final class DataLakeClientFactory {
                 .buildClient();
     }
 
+    private static DataLakeServiceClient createDataLakeServiceClientWithSas(
+            final DataLakeConfiguration configuration) {
+        AzureSasCredential sasCredential = configuration.getSasCredential();
+        if (sasCredential == null) {
+            LOG.trace("Using SAS signature to instantiate service client");
+            sasCredential = new AzureSasCredential(configuration.getSasSignature());
+        } else {
+            LOG.trace("Using configured SAS instance to instantiate service client");
+        }
+
+        return new DataLakeServiceClientBuilder()
+                .credential(sasCredential)
+                .endpoint(buildAzureUri(configuration))
+                .buildClient();
+    }
+
+    private static DataLakeServiceClient createDataLakeServiceClientWithDefaultIdentity(
+            final DataLakeConfiguration configuration) {
+        LOG.trace("Using default identity to instantiate service client");
+        final DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder();
+        if (configuration.getTenantId() != null) {
+            defaultAzureCredentialBuilder.tenantId(configuration.getTenantId());
+        }
+        return new DataLakeServiceClientBuilder()
+                .credential(defaultAzureCredentialBuilder.build())
+                .endpoint(buildAzureUri(configuration))
+                .buildClient();
+    }
+
     private static String buildAzureUri(final DataLakeConfiguration configuration) {
         return String.format(Locale.ROOT, "https://%s" + SERVICE_URI_SEGMENT, getAccountName(configuration));
     }
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
index 36bb30010f5..c7e67aa8a11 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
@@ -34,6 +34,7 @@ import com.azure.storage.file.datalake.models.FileReadResponse;
 import com.azure.storage.file.datalake.models.PathHttpHeaders;
 import com.azure.storage.file.datalake.models.PathInfo;
 import com.azure.storage.file.datalake.models.PathProperties;
+import com.azure.storage.file.datalake.options.DataLakeFileAppendOptions;
 import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
 import com.azure.storage.file.datalake.options.FileQueryOptions;
 import com.azure.storage.file.datalake.sas.DataLakeServiceSasSignatureValues;
@@ -104,8 +105,8 @@ public class DataLakeFileClientWrapper {
 
     public Response<Void> appendWithResponse(
             final InputStream stream, final Long fileOffset, final Long length,
-            final byte[] contentMd5, final String leaseId, final Duration timeout) {
-        return client.appendWithResponse(stream, fileOffset, length, contentMd5, leaseId, timeout, Context.NONE);
+            final Duration timeout, final DataLakeFileAppendOptions options) {
+        return client.appendWithResponse(stream, fileOffset, length, options, timeout, Context.NONE);
     }
 
     public Response<PathInfo> uploadWithResponse(final FileParallelUploadOptions uploadOptions, final Duration timeout) {
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeDirectoryOperations.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeDirectoryOperations.java
index c344ace90d5..6715bec710c 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeDirectoryOperations.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeDirectoryOperations.java
@@ -51,8 +51,10 @@ public class DataLakeDirectoryOperations {
 
         final Response<DataLakeFileClient> response = client.createFileWithResponse(fileName, permission, umask,
                 httpHeaders, metadata, requestConditions, timeout);
-        DataLakeExchangeHeaders exchangeHeaders = DataLakeExchangeHeaders.create().httpHeaders(response.getHeaders());
-        return new DataLakeOperationResponse(response.getValue(), exchangeHeaders.toMap());
+        final DataLakeFileClient fileClient = response.getValue();
+        final DataLakeExchangeHeaders exchangeHeaders
+                = DataLakeExchangeHeaders.create().httpHeaders(response.getHeaders()).fileClient(fileClient);
+        return new DataLakeOperationResponse(fileClient, exchangeHeaders.toMap());
     }
 
     public DataLakeOperationResponse deleteDirectory(final Exchange exchange) {
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperations.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperations.java
index d34b036cc2e..16825067cae 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperations.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperations.java
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import com.azure.core.http.rest.Response;
 import com.azure.storage.common.ParallelTransferOptions;
+import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.models.AccessTier;
 import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
 import com.azure.storage.file.datalake.models.DownloadRetryOptions;
@@ -36,6 +37,7 @@ import com.azure.storage.file.datalake.models.FileReadResponse;
 import com.azure.storage.file.datalake.models.PathHttpHeaders;
 import com.azure.storage.file.datalake.models.PathInfo;
 import com.azure.storage.file.datalake.models.PathProperties;
+import com.azure.storage.file.datalake.options.DataLakeFileAppendOptions;
 import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
 import com.azure.storage.file.datalake.options.FileQueryOptions;
 import com.azure.storage.file.datalake.sas.DataLakeServiceSasSignatureValues;
@@ -69,8 +71,9 @@ public class DataLakeFileOperations {
             outputStream = message.getBody(OutputStream.class);
         }
 
+        final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
         if (outputStream == null) {
-            InputStream fileInputStream = client.openInputStream();
+            InputStream fileInputStream = fileClientWrapper.openInputStream();
             return new DataLakeOperationResponse(fileInputStream);
         }
 
@@ -80,9 +83,10 @@ public class DataLakeFileOperations {
         final DownloadRetryOptions downloadRetryOptions = getDownloadRetryOptions(configurationProxy);
 
         try {
-            final FileReadResponse readResponse = client.downloadWithResponse(outputStream, fileRange, downloadRetryOptions,
-                    fileCommonRequestOptions.getRequestConditions(), fileCommonRequestOptions.getContentMD5() != null,
-                    fileCommonRequestOptions.getTimeout());
+            final FileReadResponse readResponse
+                    = fileClientWrapper.downloadWithResponse(outputStream, fileRange, downloadRetryOptions,
+                            fileCommonRequestOptions.getRequestConditions(), fileCommonRequestOptions.getContentMD5() != null,
+                            fileCommonRequestOptions.getTimeout());
 
             final DataLakeExchangeHeaders dataLakeExchangeHeaders = DataLakeExchangeHeaders
                     .createDataLakeExchangeHeadersFromFileReadHeaders(readResponse.getDeserializedHeaders())
@@ -101,17 +105,17 @@ public class DataLakeFileOperations {
         if (ObjectHelper.isEmpty(fileDir)) {
             throw new IllegalArgumentException("to download a file, you need to specify the fileDir in the URI");
         }
-
-        final File recieverFile = new File(fileDir, client.getFileName());
+        final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+        final File recieverFile = new File(fileDir, fileClientWrapper.getFileName());
         final FileCommonRequestOptions commonRequestOptions = getCommonRequestOptions(exchange);
         final FileRange fileRange = configurationProxy.getFileRange(exchange);
         final ParallelTransferOptions parallelTransferOptions = configurationProxy.getParallelTransferOptions(exchange);
         final DownloadRetryOptions downloadRetryOptions = getDownloadRetryOptions(configurationProxy);
         final Set<OpenOption> openOptions = configurationProxy.getOpenOptions(exchange);
-
-        final Response<PathProperties> response = client.downloadToFileWithResponse(recieverFile.toString(), fileRange,
-                parallelTransferOptions, downloadRetryOptions, commonRequestOptions.getRequestConditions(),
-                commonRequestOptions.getContentMD5() != null, openOptions, commonRequestOptions.getTimeout());
+        final Response<PathProperties> response
+                = fileClientWrapper.downloadToFileWithResponse(recieverFile.toString(), fileRange,
+                        parallelTransferOptions, downloadRetryOptions, commonRequestOptions.getRequestConditions(),
+                        commonRequestOptions.getContentMD5() != null, openOptions, commonRequestOptions.getTimeout());
         final DataLakeExchangeHeaders exchangeHeaders
                 = DataLakeExchangeHeaders.createDataLakeExchangeHeadersFromPathProperties(response.getValue())
                         .httpHeaders(response.getHeaders())
@@ -135,14 +139,17 @@ public class DataLakeFileOperations {
 
         final DataLakeServiceSasSignatureValues serviceSasSignatureValues
                 = new DataLakeServiceSasSignatureValues(offsetDateTimeToSet, sasPermission);
-        final String url = client.getFileUrl() + "?" + client.generateSas(serviceSasSignatureValues);
+        final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+        final String url = fileClientWrapper.getFileUrl() + "?" + fileClientWrapper.generateSas(serviceSasSignatureValues);
         final DataLakeExchangeHeaders headers = DataLakeExchangeHeaders.create().downloadLink(url);
         return new DataLakeOperationResponse(url, headers.toMap());
     }
 
     public DataLakeOperationResponse deleteFile(final Exchange exchange) {
         final FileCommonRequestOptions commonRequestOptions = getCommonRequestOptions(exchange);
-        Response<Void> response = client.delete(commonRequestOptions.getRequestConditions(), commonRequestOptions.getTimeout());
+        final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+        Response<Void> response
+                = fileClientWrapper.delete(commonRequestOptions.getRequestConditions(), commonRequestOptions.getTimeout());
         DataLakeExchangeHeaders exchangeHeaders = DataLakeExchangeHeaders.create();
         exchangeHeaders.httpHeaders(response.getHeaders());
         return new DataLakeOperationResponse(true, exchangeHeaders.toMap());
@@ -151,15 +158,19 @@ public class DataLakeFileOperations {
     public DataLakeOperationResponse appendToFile(final Exchange exchange) throws IOException {
         final FileCommonRequestOptions commonRequestOptions = getCommonRequestOptions(exchange);
         final FileStreamAndLength fileStreamAndLength = FileStreamAndLength.createFileStreamAndLengthFromExchangeBody(exchange);
+        final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
         final Long fileOffset;
         if (configurationProxy.getFileOffset(exchange) == null) {
-            fileOffset = client.getFileSize();
+            fileOffset = fileClientWrapper.getFileSize();
         } else {
             fileOffset = configurationProxy.getFileOffset(exchange);
         }
-        final Response<Void> response = client.appendWithResponse(fileStreamAndLength.getInputStream(), fileOffset,
-                fileStreamAndLength.getStreamLength(), commonRequestOptions.getContentMD5(), commonRequestOptions.getLeaseId(),
-                commonRequestOptions.getTimeout());
+        final DataLakeFileAppendOptions options = new DataLakeFileAppendOptions();
+        options.setContentHash(commonRequestOptions.getContentMD5());
+        options.setLeaseId(commonRequestOptions.getLeaseId());
+        options.setFlush(configurationProxy.getFlush(exchange));
+        final Response<Void> response = fileClientWrapper.appendWithResponse(fileStreamAndLength.getInputStream(), fileOffset,
+                fileStreamAndLength.getStreamLength(), commonRequestOptions.getTimeout(), options);
         DataLakeExchangeHeaders exchangeHeaders = DataLakeExchangeHeaders.create();
         exchangeHeaders.httpHeaders(response.getHeaders());
         return new DataLakeOperationResponse(true, exchangeHeaders.toMap());
@@ -170,9 +181,9 @@ public class DataLakeFileOperations {
         final Long position = configurationProxy.getPosition(exchange);
         final Boolean retainUncommitedData = configurationProxy.retainUnCommitedData(exchange);
         final Boolean close = configurationProxy.getClose(exchange);
-
+        final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
         final Response<PathInfo> response
-                = client.flushWithResponse(position + client.getFileSize(), retainUncommitedData, close,
+                = fileClientWrapper.flushWithResponse(position + fileClientWrapper.getFileSize(), retainUncommitedData, close,
                         commonRequestOptions.getPathHttpHeaders(), commonRequestOptions.getRequestConditions(),
                         commonRequestOptions.getTimeout());
         DataLakeExchangeHeaders exchangeHeaders
@@ -185,7 +196,8 @@ public class DataLakeFileOperations {
         final String path = configurationProxy.getPath(exchange);
         final ParallelTransferOptions transferOptions = configurationProxy.getParallelTransferOptions(exchange);
         final FileCommonRequestOptions commonRequestOptions = getCommonRequestOptions(exchange);
-        client.uploadFromFile(path, transferOptions, commonRequestOptions.getPathHttpHeaders(),
+        final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+        fileClientWrapper.uploadFromFile(path, transferOptions, commonRequestOptions.getPathHttpHeaders(),
                 commonRequestOptions.getMetadata(), commonRequestOptions.getRequestConditions(),
                 commonRequestOptions.getTimeout());
         return new DataLakeOperationResponse(true);
@@ -204,8 +216,9 @@ public class DataLakeFileOperations {
                         .setMetadata(commonRequestOptions.getMetadata()).setPermissions(permission)
                         .setRequestConditions(commonRequestOptions.getRequestConditions())
                         .setRequestConditions(commonRequestOptions.getRequestConditions()).setUmask(umask);
-
-        final Response<PathInfo> response = client.uploadWithResponse(uploadOptions, commonRequestOptions.getTimeout());
+        final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+        final Response<PathInfo> response
+                = fileClientWrapper.uploadWithResponse(uploadOptions, commonRequestOptions.getTimeout());
         DataLakeExchangeHeaders exchangeHeaders
                 = DataLakeExchangeHeaders.createDataLakeExchangeHeadersFromPathInfo(response.getValue())
                         .httpHeaders(response.getHeaders());
@@ -214,7 +227,8 @@ public class DataLakeFileOperations {
 
     public DataLakeOperationResponse openQueryInputStream(final Exchange exchange) {
         FileQueryOptions queryOptions = configurationProxy.getFileQueryOptions(exchange);
-        final Response<InputStream> response = client.openQueryInputStreamWithResponse(queryOptions);
+        final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+        final Response<InputStream> response = fileClientWrapper.openQueryInputStreamWithResponse(queryOptions);
         DataLakeExchangeHeaders exchangeHeaders = DataLakeExchangeHeaders.create();
         exchangeHeaders.httpHeaders(response.getHeaders());
         return new DataLakeOperationResponse(response.getValue(), exchangeHeaders.toMap());
@@ -234,4 +248,9 @@ public class DataLakeFileOperations {
     private DownloadRetryOptions getDownloadRetryOptions(final DataLakeConfigurationOptionsProxy proxy) {
         return new DownloadRetryOptions().setMaxRetryRequests(proxy.getMaxRetryRequests());
     }
+
+    private DataLakeFileClientWrapper getFileClientWrapper(final Exchange exchange) {
+        final DataLakeFileClient fileClient = configurationProxy.getFileClient(exchange);
+        return null == fileClient ? client : new DataLakeFileClientWrapper(fileClient);
+    }
 }
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperationTest.java b/components/camel-azure/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperationTest.java
index d55c71307b4..0c3380d43b7 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperationTest.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperationTest.java
@@ -26,7 +26,9 @@ import java.time.OffsetDateTime;
 
 import com.azure.core.http.HttpHeaders;
 import com.azure.core.http.rest.ResponseBase;
+import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.models.PathInfo;
+import com.azure.storage.file.datalake.models.PathProperties;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.azure.storage.datalake.DataLakeConfiguration;
 import org.apache.camel.component.azure.storage.datalake.DataLakeConstants;
@@ -44,6 +46,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
@@ -55,6 +59,9 @@ public class DataLakeFileOperationTest extends CamelTestSupport {
     @Mock
     private DataLakeFileClientWrapper client;
 
+    @Mock
+    private DataLakeFileClient alternateClient;
+
     @BeforeEach
     public void setup() {
         configuration = new DataLakeConfiguration();
@@ -107,4 +114,25 @@ public class DataLakeFileOperationTest extends CamelTestSupport {
         assertEquals(time, response.getHeaders().get(DataLakeConstants.LAST_MODIFIED));
     }
 
+    @Test
+    void testServiceClientOverride() throws Exception {
+        final HttpHeaders httpHeaders = new HttpHeaders();
+        final byte[] testing = "testing".getBytes(Charset.defaultCharset());
+        when(alternateClient.appendWithResponse(any(), anyLong(), anyLong(), any(), any(), any()))
+                .thenReturn(new ResponseBase<>(null, 200, httpHeaders, null, null));
+        final PathProperties properties = mock(PathProperties.class);
+        when(properties.getFileSize()).thenReturn(Long.valueOf(testing.length));
+        when(alternateClient.getProperties()).thenReturn(properties);
+
+        final Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setHeader(DataLakeConstants.FILE_CLIENT, alternateClient);
+        exchange.getIn().setBody(new ByteArrayInputStream(testing));
+
+        final DataLakeFileOperations operations = new DataLakeFileOperations(configuration, client);
+        final DataLakeOperationResponse response = operations.appendToFile(exchange);
+
+        assertNotNull(response);
+        assertTrue((boolean) response.getBody());
+        assertNotNull(response.getHeaders());
+    }
 }