You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/03/22 12:46:18 UTC
[camel] 01/01: Azure Data Lake - CAMEL-19152 additional file client header field + CAMEL-18636 additional auth options (#9567)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch azure-datalake-backport
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 80c649eecf559da0f01a56fef95d85e4b0f6a31d
Author: Andreas Klug <an...@de.bosch.com>
AuthorDate: Mon Mar 20 07:57:02 2023 +0100
Azure Data Lake - CAMEL-19152 additional file client header field + CAMEL-18636 additional auth options (#9567)
* CAMEL-19152 - Add file client header and flush option
* CAMEL-18636: Allow string based auth + additional auth methods
* CAMEL-18636: Add security label to config parameters
* CAMEL-18636: Fixed setting SAS credentials from registry
* camel-jbang - Fix NPE
* CAMEL-19152 - Add file client header and flush option
* CAMEL-18636: rebase - removed validate method from component
* CAMEL-18636: Add security label to config parameters
* CAMEL-18636: Fixed setting SAS credentials from registry
---------
Co-authored-by: Andreas Klug (BD/XDE1) <KG...@fe-c-014an.fritz.box>
Co-authored-by: Claus Ibsen <cl...@gmail.com>
---
.../azure/storage/datalake/DataLakeComponent.java | 9 ++--
.../storage/datalake/DataLakeConfiguration.java | 37 +++++++++++--
.../DataLakeConfigurationOptionsProxy.java | 9 ++++
.../azure/storage/datalake/DataLakeConstants.java | 4 ++
.../storage/datalake/DataLakeExchangeHeaders.java | 13 +++++
.../datalake/client/DataLakeClientFactory.java | 48 ++++++++++++++++-
.../datalake/client/DataLakeFileClientWrapper.java | 5 +-
.../operations/DataLakeDirectoryOperations.java | 6 ++-
.../operations/DataLakeFileOperations.java | 63 ++++++++++++++--------
.../operations/DataLakeFileOperationTest.java | 28 ++++++++++
10 files changed, 189 insertions(+), 33 deletions(-)
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeComponent.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeComponent.java
index 7e6178a86b6..aae9545d02a 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeComponent.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeComponent.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.azure.storage.datalake;
import java.util.Map;
import java.util.Set;
+import com.azure.core.credential.AzureSasCredential;
import com.azure.identity.ClientSecretCredential;
import com.azure.storage.common.StorageSharedKeyCredential;
import org.apache.camel.CamelContext;
@@ -85,16 +86,18 @@ public class DataLakeComponent extends DefaultComponent {
= getCamelContext().getRegistry().findByType(StorageSharedKeyCredential.class);
final Set<ClientSecretCredential> clientSecretCredentials
= getCamelContext().getRegistry().findByType(ClientSecretCredential.class);
+ final Set<AzureSasCredential> sasCredentials
+ = getCamelContext().getRegistry().findByType(AzureSasCredential.class);
if (storageSharedKeyCredentials.size() == 1) {
configuration.setSharedKeyCredential(storageSharedKeyCredentials.stream().findFirst().get());
}
-
if (clientSecretCredentials.size() == 1) {
configuration.setClientSecretCredential(clientSecretCredentials.stream().findFirst().get());
}
-
+ if (sasCredentials.size() == 1) {
+ configuration.setSasCredential(sasCredentials.stream().findFirst().get());
+ }
}
}
-
}
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfiguration.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfiguration.java
index b38a36d94f4..a77a20e2c3b 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfiguration.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfiguration.java
@@ -20,6 +20,7 @@ import java.nio.file.OpenOption;
import java.time.Duration;
import java.util.Set;
+import com.azure.core.credential.AzureSasCredential;
import com.azure.identity.ClientSecretCredential;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
@@ -42,16 +43,16 @@ public class DataLakeConfiguration implements Cloneable {
private String directoryName;
@UriParam(description = "name of file to be handled in component")
private String fileName;
- @UriParam(description = "client secret credential for authentication")
+ @UriParam(label = "security", secret = true, description = "client secret credential for authentication")
private ClientSecretCredential clientSecretCredential;
@UriParam(description = "datalake service client for azure storage datalake")
@Metadata(autowired = true)
private DataLakeServiceClient serviceClient;
- @UriParam(description = "account key for authentication")
+ @UriParam(label = "security", secret = true, description = "account key for authentication")
private String accountKey;
@UriParam(description = "client id for azure account")
private String clientId;
- @UriParam(description = "client secret for azure account")
+ @UriParam(label = "security", secret = true, description = "client secret for azure account")
private String clientSecret;
@UriParam(description = "tenant id for azure account")
private String tenantId;
@@ -93,6 +94,12 @@ public class DataLakeConfiguration implements Cloneable {
private String umask;
@UriParam(description = "set open options for creating file")
private Set<OpenOption> openOptions;
+ @UriParam(label = "security", secret = true, description = "SAS token signature")
+ private String sasSignature;
+ @UriParam(label = "security", secret = true, description = "SAS token credential")
+ private AzureSasCredential sasCredential;
+ @UriParam(label = "security", secret = false, description = "Use default identity")
+ private Boolean useDefaultIdentity = false;
@UriParam(label = "producer", enums = "listFileSystem, listFiles", defaultValue = "listFileSystem",
description = "operation to be performed")
@@ -346,6 +353,30 @@ public class DataLakeConfiguration implements Cloneable {
this.openOptions = openOptions;
}
+ public String getSasSignature() {
+ return sasSignature;
+ }
+
+ public void setSasSignature(String sasSignature) {
+ this.sasSignature = sasSignature;
+ }
+
+ public AzureSasCredential getSasCredential() {
+ return sasCredential;
+ }
+
+ public void setSasCredential(AzureSasCredential sasCredential) {
+ this.sasCredential = sasCredential;
+ }
+
+ public Boolean getUseDefaultIdentity() {
+ return useDefaultIdentity;
+ }
+
+ public void setUseDefaultIdentity(Boolean useDefaultIdentity) {
+ this.useDefaultIdentity = useDefaultIdentity;
+ }
+
public DataLakeConfiguration copy() {
try {
return (DataLakeConfiguration) super.clone();
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfigurationOptionsProxy.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfigurationOptionsProxy.java
index 4879b3dc9b0..492506a7232 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfigurationOptionsProxy.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConfigurationOptionsProxy.java
@@ -25,6 +25,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import com.azure.storage.common.ParallelTransferOptions;
+import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.models.AccessTier;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.FileQueryError;
@@ -117,6 +118,10 @@ public class DataLakeConfigurationOptionsProxy {
return getOption(DataLakeExchangeHeaders::getLeaseIdFromHeaders, () -> null, exchange);
}
+ public Boolean getFlush(final Exchange exchange) {
+ return getOption(DataLakeExchangeHeaders::getFlushFromHeaders, () -> Boolean.FALSE, exchange);
+ }
+
public Boolean retainUnCommitedData(final Exchange exchange) {
return getOption(DataLakeExchangeHeaders::getRetainUncommittedDataFromHeaders, configuration::getRetainUncommitedData,
exchange);
@@ -236,6 +241,10 @@ public class DataLakeConfigurationOptionsProxy {
return DataLakeExchangeHeaders.getPathHttpHeadersFromHeaders(exchange);
}
+ public DataLakeFileClient getFileClient(final Exchange exchange) {
+ return DataLakeExchangeHeaders.getFileClientFromHeaders(exchange);
+ }
+
public int getMaxRetryRequests() {
return configuration.getMaxRetryRequests();
}
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConstants.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConstants.java
index 2d29ee6a018..8b1ced23421 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConstants.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeConstants.java
@@ -204,6 +204,10 @@ public final class DataLakeConstants {
public static final String PERMISSION = HEADER_PREFIX + "Permission";
@Metadata(label = "from user", description = "Sets the umask for file.", javaType = "String")
public static final String UMASK = HEADER_PREFIX + "Umask";
+ @Metadata(label = "from user", description = "Sets the file client to use", javaType = "DataLakeFileClient")
+ public static final String FILE_CLIENT = HEADER_PREFIX + "FileClient";
+ @Metadata(label = "from user", description = "Sets whether to flush on append", javaType = "Boolean")
+ public static final String FLUSH = HEADER_PREFIX + "Flush";
private DataLakeConstants() {
}
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java
index 591e34641e0..a6c890c1ab8 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/DataLakeExchangeHeaders.java
@@ -26,6 +26,7 @@ import java.util.function.Consumer;
import com.azure.core.http.HttpHeaders;
import com.azure.storage.common.ParallelTransferOptions;
+import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.models.AccessTier;
import com.azure.storage.file.datalake.models.ArchiveStatus;
import com.azure.storage.file.datalake.models.CopyStatusType;
@@ -258,6 +259,14 @@ public class DataLakeExchangeHeaders {
return getObjectFromHeaders(exchange, DataLakeConstants.UMASK, String.class);
}
+ public static DataLakeFileClient getFileClientFromHeaders(final Exchange exchange) {
+ return getObjectFromHeaders(exchange, DataLakeConstants.FILE_CLIENT, DataLakeFileClient.class);
+ }
+
+ public static Boolean getFlushFromHeaders(final Exchange exchange) {
+ return getObjectFromHeaders(exchange, DataLakeConstants.FLUSH, Boolean.class);
+ }
+
private static <T> T getObjectFromHeaders(final Exchange exchange, final String headerName, final Class<T> classType) {
return ObjectHelper.isEmpty(exchange) ? null : exchange.getIn().getHeader(headerName, classType);
}
@@ -415,4 +424,8 @@ public class DataLakeExchangeHeaders {
return this;
}
+ public DataLakeExchangeHeaders fileClient(final DataLakeFileClient fileClient) {
+ headers.put(DataLakeConstants.FILE_CLIENT, fileClient);
+ return this;
+ }
}
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeClientFactory.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeClientFactory.java
index ab6da91347b..47dbda393e3 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeClientFactory.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeClientFactory.java
@@ -18,36 +18,51 @@ package org.apache.camel.component.azure.storage.datalake.client;
import java.util.Locale;
+import com.azure.core.credential.AzureSasCredential;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
+import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
import org.apache.camel.component.azure.storage.datalake.DataLakeConfiguration;
import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class DataLakeClientFactory {
private static final String SERVICE_URI_SEGMENT = ".dfs.core.windows.net";
+ private static final Logger LOG = LoggerFactory.getLogger(DataLakeClientFactory.class);
+
private DataLakeClientFactory() {
}
public static DataLakeServiceClient createDataLakeServiceClient(final DataLakeConfiguration configuration) {
final DataLakeServiceClient client;
if (configuration.getServiceClient() != null) {
+ LOG.trace("Using configured service client instance");
client = configuration.getServiceClient();
+ } else if (configuration.getUseDefaultIdentity()) {
+ client = createDataLakeServiceClientWithDefaultIdentity(configuration);
} else if (configuration.getAccountKey() != null || configuration.getSharedKeyCredential() != null) {
client = createDataLakeServiceClientWithSharedKey(configuration);
+ } else if (configuration.getSasSignature() != null || configuration.getSasCredential() != null) {
+ client = createDataLakeServiceClientWithSas(configuration);
} else {
client = createDataLakeServiceClientWithClientSecret(configuration);
}
+
return client;
}
private static DataLakeServiceClient createDataLakeServiceClientWithSharedKey(final DataLakeConfiguration configuration) {
StorageSharedKeyCredential sharedKeyCredential = configuration.getSharedKeyCredential();
if (sharedKeyCredential == null) {
+ LOG.trace("Using account name and account key to instantiate service client");
sharedKeyCredential = new StorageSharedKeyCredential(configuration.getAccountName(), configuration.getAccountKey());
+ } else {
+ LOG.trace("Using configured shared key instance to instantiate service client");
}
return new DataLakeServiceClientBuilder()
@@ -59,13 +74,15 @@ public final class DataLakeClientFactory {
private static DataLakeServiceClient createDataLakeServiceClientWithClientSecret(
final DataLakeConfiguration configuration) {
ClientSecretCredential clientSecretCredential = configuration.getClientSecretCredential();
-
if (clientSecretCredential == null) {
+ LOG.trace("Using client id, client secret, tenant id to instantiate service client");
clientSecretCredential = new ClientSecretCredentialBuilder()
.clientId(configuration.getClientId())
.clientSecret(configuration.getClientSecret())
.tenantId(configuration.getTenantId())
.build();
+ } else {
+ LOG.trace("Using configured client secret instance to instantiate service client");
}
return new DataLakeServiceClientBuilder()
@@ -74,6 +91,35 @@ public final class DataLakeClientFactory {
.buildClient();
}
+ private static DataLakeServiceClient createDataLakeServiceClientWithSas(
+ final DataLakeConfiguration configuration) {
+ AzureSasCredential sasCredential = configuration.getSasCredential();
+ if (sasCredential == null) {
+ LOG.trace("Using SAS signature to instantiate service client");
+ sasCredential = new AzureSasCredential(configuration.getSasSignature());
+ } else {
+ LOG.trace("Using configured SAS instance to instantiate service client");
+ }
+
+ return new DataLakeServiceClientBuilder()
+ .credential(sasCredential)
+ .endpoint(buildAzureUri(configuration))
+ .buildClient();
+ }
+
+ private static DataLakeServiceClient createDataLakeServiceClientWithDefaultIdentity(
+ final DataLakeConfiguration configuration) {
+ LOG.trace("Using default identity to instantiate service client");
+ final DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder();
+ if (configuration.getTenantId() != null) {
+ defaultAzureCredentialBuilder.tenantId(configuration.getTenantId());
+ }
+ return new DataLakeServiceClientBuilder()
+ .credential(defaultAzureCredentialBuilder.build())
+ .endpoint(buildAzureUri(configuration))
+ .buildClient();
+ }
+
private static String buildAzureUri(final DataLakeConfiguration configuration) {
return String.format(Locale.ROOT, "https://%s" + SERVICE_URI_SEGMENT, getAccountName(configuration));
}
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
index 36bb30010f5..c7e67aa8a11 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/client/DataLakeFileClientWrapper.java
@@ -34,6 +34,7 @@ import com.azure.storage.file.datalake.models.FileReadResponse;
import com.azure.storage.file.datalake.models.PathHttpHeaders;
import com.azure.storage.file.datalake.models.PathInfo;
import com.azure.storage.file.datalake.models.PathProperties;
+import com.azure.storage.file.datalake.options.DataLakeFileAppendOptions;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
import com.azure.storage.file.datalake.sas.DataLakeServiceSasSignatureValues;
@@ -104,8 +105,8 @@ public class DataLakeFileClientWrapper {
public Response<Void> appendWithResponse(
final InputStream stream, final Long fileOffset, final Long length,
- final byte[] contentMd5, final String leaseId, final Duration timeout) {
- return client.appendWithResponse(stream, fileOffset, length, contentMd5, leaseId, timeout, Context.NONE);
+ final Duration timeout, final DataLakeFileAppendOptions options) {
+ return client.appendWithResponse(stream, fileOffset, length, options, timeout, Context.NONE);
}
public Response<PathInfo> uploadWithResponse(final FileParallelUploadOptions uploadOptions, final Duration timeout) {
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeDirectoryOperations.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeDirectoryOperations.java
index c344ace90d5..6715bec710c 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeDirectoryOperations.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeDirectoryOperations.java
@@ -51,8 +51,10 @@ public class DataLakeDirectoryOperations {
final Response<DataLakeFileClient> response = client.createFileWithResponse(fileName, permission, umask,
httpHeaders, metadata, requestConditions, timeout);
- DataLakeExchangeHeaders exchangeHeaders = DataLakeExchangeHeaders.create().httpHeaders(response.getHeaders());
- return new DataLakeOperationResponse(response.getValue(), exchangeHeaders.toMap());
+ final DataLakeFileClient fileClient = response.getValue();
+ final DataLakeExchangeHeaders exchangeHeaders
+ = DataLakeExchangeHeaders.create().httpHeaders(response.getHeaders()).fileClient(fileClient);
+ return new DataLakeOperationResponse(fileClient, exchangeHeaders.toMap());
}
public DataLakeOperationResponse deleteDirectory(final Exchange exchange) {
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperations.java b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperations.java
index d34b036cc2e..16825067cae 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperations.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/main/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperations.java
@@ -28,6 +28,7 @@ import java.util.Set;
import com.azure.core.http.rest.Response;
import com.azure.storage.common.ParallelTransferOptions;
+import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.models.AccessTier;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
@@ -36,6 +37,7 @@ import com.azure.storage.file.datalake.models.FileReadResponse;
import com.azure.storage.file.datalake.models.PathHttpHeaders;
import com.azure.storage.file.datalake.models.PathInfo;
import com.azure.storage.file.datalake.models.PathProperties;
+import com.azure.storage.file.datalake.options.DataLakeFileAppendOptions;
import com.azure.storage.file.datalake.options.FileParallelUploadOptions;
import com.azure.storage.file.datalake.options.FileQueryOptions;
import com.azure.storage.file.datalake.sas.DataLakeServiceSasSignatureValues;
@@ -69,8 +71,9 @@ public class DataLakeFileOperations {
outputStream = message.getBody(OutputStream.class);
}
+ final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
if (outputStream == null) {
- InputStream fileInputStream = client.openInputStream();
+ InputStream fileInputStream = fileClientWrapper.openInputStream();
return new DataLakeOperationResponse(fileInputStream);
}
@@ -80,9 +83,10 @@ public class DataLakeFileOperations {
final DownloadRetryOptions downloadRetryOptions = getDownloadRetryOptions(configurationProxy);
try {
- final FileReadResponse readResponse = client.downloadWithResponse(outputStream, fileRange, downloadRetryOptions,
- fileCommonRequestOptions.getRequestConditions(), fileCommonRequestOptions.getContentMD5() != null,
- fileCommonRequestOptions.getTimeout());
+ final FileReadResponse readResponse
+ = fileClientWrapper.downloadWithResponse(outputStream, fileRange, downloadRetryOptions,
+ fileCommonRequestOptions.getRequestConditions(), fileCommonRequestOptions.getContentMD5() != null,
+ fileCommonRequestOptions.getTimeout());
final DataLakeExchangeHeaders dataLakeExchangeHeaders = DataLakeExchangeHeaders
.createDataLakeExchangeHeadersFromFileReadHeaders(readResponse.getDeserializedHeaders())
@@ -101,17 +105,17 @@ public class DataLakeFileOperations {
if (ObjectHelper.isEmpty(fileDir)) {
throw new IllegalArgumentException("to download a file, you need to specify the fileDir in the URI");
}
-
- final File recieverFile = new File(fileDir, client.getFileName());
+ final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+ final File recieverFile = new File(fileDir, fileClientWrapper.getFileName());
final FileCommonRequestOptions commonRequestOptions = getCommonRequestOptions(exchange);
final FileRange fileRange = configurationProxy.getFileRange(exchange);
final ParallelTransferOptions parallelTransferOptions = configurationProxy.getParallelTransferOptions(exchange);
final DownloadRetryOptions downloadRetryOptions = getDownloadRetryOptions(configurationProxy);
final Set<OpenOption> openOptions = configurationProxy.getOpenOptions(exchange);
-
- final Response<PathProperties> response = client.downloadToFileWithResponse(recieverFile.toString(), fileRange,
- parallelTransferOptions, downloadRetryOptions, commonRequestOptions.getRequestConditions(),
- commonRequestOptions.getContentMD5() != null, openOptions, commonRequestOptions.getTimeout());
+ final Response<PathProperties> response
+ = fileClientWrapper.downloadToFileWithResponse(recieverFile.toString(), fileRange,
+ parallelTransferOptions, downloadRetryOptions, commonRequestOptions.getRequestConditions(),
+ commonRequestOptions.getContentMD5() != null, openOptions, commonRequestOptions.getTimeout());
final DataLakeExchangeHeaders exchangeHeaders
= DataLakeExchangeHeaders.createDataLakeExchangeHeadersFromPathProperties(response.getValue())
.httpHeaders(response.getHeaders())
@@ -135,14 +139,17 @@ public class DataLakeFileOperations {
final DataLakeServiceSasSignatureValues serviceSasSignatureValues
= new DataLakeServiceSasSignatureValues(offsetDateTimeToSet, sasPermission);
- final String url = client.getFileUrl() + "?" + client.generateSas(serviceSasSignatureValues);
+ final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+ final String url = fileClientWrapper.getFileUrl() + "?" + fileClientWrapper.generateSas(serviceSasSignatureValues);
final DataLakeExchangeHeaders headers = DataLakeExchangeHeaders.create().downloadLink(url);
return new DataLakeOperationResponse(url, headers.toMap());
}
public DataLakeOperationResponse deleteFile(final Exchange exchange) {
final FileCommonRequestOptions commonRequestOptions = getCommonRequestOptions(exchange);
- Response<Void> response = client.delete(commonRequestOptions.getRequestConditions(), commonRequestOptions.getTimeout());
+ final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+ Response<Void> response
+ = fileClientWrapper.delete(commonRequestOptions.getRequestConditions(), commonRequestOptions.getTimeout());
DataLakeExchangeHeaders exchangeHeaders = DataLakeExchangeHeaders.create();
exchangeHeaders.httpHeaders(response.getHeaders());
return new DataLakeOperationResponse(true, exchangeHeaders.toMap());
@@ -151,15 +158,19 @@ public class DataLakeFileOperations {
public DataLakeOperationResponse appendToFile(final Exchange exchange) throws IOException {
final FileCommonRequestOptions commonRequestOptions = getCommonRequestOptions(exchange);
final FileStreamAndLength fileStreamAndLength = FileStreamAndLength.createFileStreamAndLengthFromExchangeBody(exchange);
+ final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
final Long fileOffset;
if (configurationProxy.getFileOffset(exchange) == null) {
- fileOffset = client.getFileSize();
+ fileOffset = fileClientWrapper.getFileSize();
} else {
fileOffset = configurationProxy.getFileOffset(exchange);
}
- final Response<Void> response = client.appendWithResponse(fileStreamAndLength.getInputStream(), fileOffset,
- fileStreamAndLength.getStreamLength(), commonRequestOptions.getContentMD5(), commonRequestOptions.getLeaseId(),
- commonRequestOptions.getTimeout());
+ final DataLakeFileAppendOptions options = new DataLakeFileAppendOptions();
+ options.setContentHash(commonRequestOptions.getContentMD5());
+ options.setLeaseId(commonRequestOptions.getLeaseId());
+ options.setFlush(configurationProxy.getFlush(exchange));
+ final Response<Void> response = fileClientWrapper.appendWithResponse(fileStreamAndLength.getInputStream(), fileOffset,
+ fileStreamAndLength.getStreamLength(), commonRequestOptions.getTimeout(), options);
DataLakeExchangeHeaders exchangeHeaders = DataLakeExchangeHeaders.create();
exchangeHeaders.httpHeaders(response.getHeaders());
return new DataLakeOperationResponse(true, exchangeHeaders.toMap());
@@ -170,9 +181,9 @@ public class DataLakeFileOperations {
final Long position = configurationProxy.getPosition(exchange);
final Boolean retainUncommitedData = configurationProxy.retainUnCommitedData(exchange);
final Boolean close = configurationProxy.getClose(exchange);
-
+ final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
final Response<PathInfo> response
- = client.flushWithResponse(position + client.getFileSize(), retainUncommitedData, close,
+ = fileClientWrapper.flushWithResponse(position + fileClientWrapper.getFileSize(), retainUncommitedData, close,
commonRequestOptions.getPathHttpHeaders(), commonRequestOptions.getRequestConditions(),
commonRequestOptions.getTimeout());
DataLakeExchangeHeaders exchangeHeaders
@@ -185,7 +196,8 @@ public class DataLakeFileOperations {
final String path = configurationProxy.getPath(exchange);
final ParallelTransferOptions transferOptions = configurationProxy.getParallelTransferOptions(exchange);
final FileCommonRequestOptions commonRequestOptions = getCommonRequestOptions(exchange);
- client.uploadFromFile(path, transferOptions, commonRequestOptions.getPathHttpHeaders(),
+ final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+ fileClientWrapper.uploadFromFile(path, transferOptions, commonRequestOptions.getPathHttpHeaders(),
commonRequestOptions.getMetadata(), commonRequestOptions.getRequestConditions(),
commonRequestOptions.getTimeout());
return new DataLakeOperationResponse(true);
@@ -204,8 +216,9 @@ public class DataLakeFileOperations {
.setMetadata(commonRequestOptions.getMetadata()).setPermissions(permission)
.setRequestConditions(commonRequestOptions.getRequestConditions())
.setRequestConditions(commonRequestOptions.getRequestConditions()).setUmask(umask);
-
- final Response<PathInfo> response = client.uploadWithResponse(uploadOptions, commonRequestOptions.getTimeout());
+ final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+ final Response<PathInfo> response
+ = fileClientWrapper.uploadWithResponse(uploadOptions, commonRequestOptions.getTimeout());
DataLakeExchangeHeaders exchangeHeaders
= DataLakeExchangeHeaders.createDataLakeExchangeHeadersFromPathInfo(response.getValue())
.httpHeaders(response.getHeaders());
@@ -214,7 +227,8 @@ public class DataLakeFileOperations {
public DataLakeOperationResponse openQueryInputStream(final Exchange exchange) {
FileQueryOptions queryOptions = configurationProxy.getFileQueryOptions(exchange);
- final Response<InputStream> response = client.openQueryInputStreamWithResponse(queryOptions);
+ final DataLakeFileClientWrapper fileClientWrapper = getFileClientWrapper(exchange);
+ final Response<InputStream> response = fileClientWrapper.openQueryInputStreamWithResponse(queryOptions);
DataLakeExchangeHeaders exchangeHeaders = DataLakeExchangeHeaders.create();
exchangeHeaders.httpHeaders(response.getHeaders());
return new DataLakeOperationResponse(response.getValue(), exchangeHeaders.toMap());
@@ -234,4 +248,9 @@ public class DataLakeFileOperations {
private DownloadRetryOptions getDownloadRetryOptions(final DataLakeConfigurationOptionsProxy proxy) {
return new DownloadRetryOptions().setMaxRetryRequests(proxy.getMaxRetryRequests());
}
+
+ private DataLakeFileClientWrapper getFileClientWrapper(final Exchange exchange) {
+ final DataLakeFileClient fileClient = configurationProxy.getFileClient(exchange);
+ return null == fileClient ? client : new DataLakeFileClientWrapper(fileClient);
+ }
}
diff --git a/components/camel-azure/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperationTest.java b/components/camel-azure/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperationTest.java
index d55c71307b4..0c3380d43b7 100644
--- a/components/camel-azure/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperationTest.java
+++ b/components/camel-azure/camel-azure-storage-datalake/src/test/java/org/apache/camel/component/azure/storage/datalake/operations/DataLakeFileOperationTest.java
@@ -26,7 +26,9 @@ import java.time.OffsetDateTime;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.rest.ResponseBase;
+import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.models.PathInfo;
+import com.azure.storage.file.datalake.models.PathProperties;
import org.apache.camel.Exchange;
import org.apache.camel.component.azure.storage.datalake.DataLakeConfiguration;
import org.apache.camel.component.azure.storage.datalake.DataLakeConstants;
@@ -44,6 +46,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@@ -55,6 +59,9 @@ public class DataLakeFileOperationTest extends CamelTestSupport {
@Mock
private DataLakeFileClientWrapper client;
+ @Mock
+ private DataLakeFileClient alternateClient;
+
@BeforeEach
public void setup() {
configuration = new DataLakeConfiguration();
@@ -107,4 +114,25 @@ public class DataLakeFileOperationTest extends CamelTestSupport {
assertEquals(time, response.getHeaders().get(DataLakeConstants.LAST_MODIFIED));
}
+ @Test
+ void testServiceClientOverride() throws Exception {
+ final HttpHeaders httpHeaders = new HttpHeaders();
+ final byte[] testing = "testing".getBytes(Charset.defaultCharset());
+ when(alternateClient.appendWithResponse(any(), anyLong(), anyLong(), any(), any(), any()))
+ .thenReturn(new ResponseBase<>(null, 200, httpHeaders, null, null));
+ final PathProperties properties = mock(PathProperties.class);
+ when(properties.getFileSize()).thenReturn(Long.valueOf(testing.length));
+ when(alternateClient.getProperties()).thenReturn(properties);
+
+ final Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setHeader(DataLakeConstants.FILE_CLIENT, alternateClient);
+ exchange.getIn().setBody(new ByteArrayInputStream(testing));
+
+ final DataLakeFileOperations operations = new DataLakeFileOperations(configuration, client);
+ final DataLakeOperationResponse response = operations.appendToFile(exchange);
+
+ assertNotNull(response);
+ assertTrue((boolean) response.getBody());
+ assertNotNull(response.getHeaders());
+ }
}