You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/05/25 18:34:35 UTC
[nifi] branch main updated: NIFI-9951 Add proxy support to Azure ADLS and Blob v12 processors
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f66540eb6d NIFI-9951 Add proxy support to Azure ADLS and Blob v12 processors
f66540eb6d is described below
commit f66540eb6de45c513d0138f073ed210c73688932
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Wed Apr 20 15:05:05 2022 +0200
NIFI-9951 Add proxy support to Azure ADLS and Blob v12 processors
This closes #5990.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi-azure-processors/pom.xml | 6 ++
.../azure/AbstractAzureBlobProcessor_v12.java | 18 ++++--
.../AbstractAzureDataLakeStorageProcessor.java | 52 ++++++----------
.../azure/storage/DeleteAzureBlobStorage_v12.java | 9 +--
.../azure/storage/DeleteAzureDataLakeStorage.java | 4 +-
.../azure/storage/FetchAzureBlobStorage_v12.java | 13 ++--
.../azure/storage/FetchAzureDataLakeStorage.java | 21 +++++--
.../azure/storage/ListAzureBlobStorage_v12.java | 3 +-
.../azure/storage/ListAzureDataLakeStorage.java | 4 +-
.../azure/storage/MoveAzureDataLakeStorage.java | 4 +-
.../azure/storage/PutAzureBlobStorage_v12.java | 17 +++---
.../azure/storage/PutAzureDataLakeStorage.java | 22 +++----
.../azure/storage/utils/AzureStorageUtils.java | 44 ++++++++++++++
.../azure/storage/AbstractAzureStorageIT.java | 71 +++++++++++++++++++---
.../storage/ITDeleteAzureBlobStorage_v12.java | 11 ++++
.../storage/ITDeleteAzureDataLakeStorage.java | 18 ++++++
.../azure/storage/ITFetchAzureBlobStorage_v12.java | 11 ++++
.../azure/storage/ITFetchAzureDataLakeStorage.java | 29 +++++++--
.../azure/storage/ITListAzureBlobStorage_v12.java | 11 ++++
.../azure/storage/ITListAzureDataLakeStorage.java | 12 +++-
.../azure/storage/ITMoveAzureDataLakeStorage.java | 11 ++++
.../azure/storage/ITPutAzureBlobStorage_v12.java | 9 +++
.../azure/storage/ITPutAzureDataLakeStorage.java | 14 ++++-
.../org/apache/nifi/proxy/ProxyConfiguration.java | 15 ++++-
.../java/org/apache/nifi/proxy/SocksVersion.java | 22 +++++++
.../proxy/StandardProxyConfigurationService.java | 26 ++++++--
26 files changed, 377 insertions(+), 100 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 0a1ebcfa70..45bf13c8c0 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -188,6 +188,12 @@
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-proxy-configuration</artifactId>
+ <version>1.17.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
index 91e44206c3..991409d5b8 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.azure;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.blob.BlobClient;
@@ -45,6 +47,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBTYPE;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER;
@@ -110,12 +113,19 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
}
public static BlobServiceClient createStorageClient(PropertyContext context) {
- AzureStorageCredentialsService_v12 credentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
- AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails();
+ final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class);
+ final AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails();
- BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder();
+ final BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder();
clientBuilder.endpoint(String.format("https://%s.%s", credentialsDetails.getAccountName(), credentialsDetails.getEndpointSuffix()));
+ final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder();
+
+ nettyClientBuilder.proxy(getProxyOptions(context));
+
+ final HttpClient nettyClient = nettyClientBuilder.build();
+ clientBuilder.httpClient(nettyClient);
+
configureCredential(clientBuilder, credentialsService, credentialsDetails);
return clientBuilder.buildClient();
@@ -131,7 +141,7 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor {
break;
case MANAGED_IDENTITY:
clientBuilder.credential(new ManagedIdentityCredentialBuilder()
- .clientId(credentialsDetails.getManagedIdentityClientId())
+ .clientId(credentialsDetails.getManagedIdentityClientId())
.build());
break;
case SERVICE_PRINCIPAL:
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 39e2ad28bc..3047de5f6e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.azure;
import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.netty.NettyAsyncHttpClientBuilder;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredential;
@@ -45,11 +47,11 @@ import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
+import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions;
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
@@ -94,23 +96,11 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
"Files that could not be written to Azure storage for some reason are transferred to this relationship")
.build();
- private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
- ADLS_CREDENTIALS_SERVICE,
- FILESYSTEM,
- DIRECTORY,
- FILE
- ));
-
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS,
REL_FAILURE
)));
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return PROPERTIES;
- }
-
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
@@ -136,43 +126,41 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
final String endpoint = String.format("https://%s.%s", accountName, endpointSuffix);
- final DataLakeServiceClient storageClient;
+ final DataLakeServiceClientBuilder dataLakeServiceClientBuilder = new DataLakeServiceClientBuilder();
+ dataLakeServiceClientBuilder.endpoint(endpoint);
+
if (StringUtils.isNotBlank(accountKey)) {
- final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
- accountKey);
- storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
- .buildClient();
+ final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
+ dataLakeServiceClientBuilder.credential(credential);
} else if (StringUtils.isNotBlank(sasToken)) {
- storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
- .buildClient();
+ dataLakeServiceClientBuilder.sasToken(sasToken);
} else if (accessToken != null) {
final TokenCredential credential = tokenRequestContext -> Mono.just(accessToken);
-
- storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
- .buildClient();
+ dataLakeServiceClientBuilder.credential(credential);
} else if (useManagedIdentity) {
final ManagedIdentityCredential misCredential = new ManagedIdentityCredentialBuilder()
.clientId(managedIdentityClientId)
.build();
- storageClient = new DataLakeServiceClientBuilder()
- .endpoint(endpoint)
- .credential(misCredential)
- .buildClient();
+ dataLakeServiceClientBuilder.credential(misCredential);
} else if (StringUtils.isNoneBlank(servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret)) {
final ClientSecretCredential credential = new ClientSecretCredentialBuilder()
.tenantId(servicePrincipalTenantId)
.clientId(servicePrincipalClientId)
.clientSecret(servicePrincipalClientSecret)
.build();
-
- storageClient = new DataLakeServiceClientBuilder()
- .endpoint(endpoint)
- .credential(credential)
- .buildClient();
+ dataLakeServiceClientBuilder.credential(credential);
} else {
throw new IllegalArgumentException("No valid credentials were provided");
}
+ final NettyAsyncHttpClientBuilder nettyClientBuilder = new NettyAsyncHttpClientBuilder();
+ nettyClientBuilder.proxy(getProxyOptions(context));
+
+ final HttpClient nettyClient = nettyClientBuilder.build();
+ dataLakeServiceClientBuilder.httpClient(nettyClient);
+
+ final DataLakeServiceClient storageClient = dataLakeServiceClientBuilder.buildClient();
+
return storageClient;
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
index a6b056feb5..1ba5f7c7d7 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java
@@ -40,8 +40,8 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
-@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
-@SeeAlso({ ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class})
+@Tags({"azure", "microsoft", "cloud", "storage", "blob"})
+@SeeAlso({ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class})
@CapabilityDescription("Deletes the specified blob from Azure Blob Storage. The processor uses Azure Blob Storage client library v12.")
@InputRequirement(Requirement.INPUT_REQUIRED)
public class DeleteAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
@@ -66,11 +66,12 @@ public class DeleteAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.CONTAINER,
BLOB_NAME,
- DELETE_SNAPSHOTS_OPTION
+ DELETE_SNAPSHOTS_OPTION,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
index b929971e8e..cf1ef18f77 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java
@@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.time.Duration;
import java.util.Arrays;
@@ -77,7 +78,8 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc
FILESYSTEM,
FILESYSTEM_OBJECT_TYPE,
DIRECTORY,
- FILE
+ FILE,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
index a12a8110e3..ac6dcae8b3 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java
@@ -63,11 +63,11 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_PRIMARY_URI;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_TIMESTAMP;
-@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
+@Tags({"azure", "microsoft", "cloud", "storage", "blob"})
@CapabilityDescription("Retrieves the specified blob from Azure Blob Storage and writes its content to the content of the FlowFile. The processor uses Azure Blob Storage client library v12.")
-@SeeAlso({ ListAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class })
+@SeeAlso({ListAzureBlobStorage_v12.class, PutAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
-@WritesAttributes({ @WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER),
+@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER),
@WritesAttribute(attribute = ATTR_NAME_BLOBNAME, description = ATTR_DESCRIPTION_BLOBNAME),
@WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI),
@WritesAttribute(attribute = ATTR_NAME_ETAG, description = ATTR_DESCRIPTION_ETAG),
@@ -75,7 +75,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
@WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = ATTR_DESCRIPTION_MIME_TYPE),
@WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
- @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH) })
+ @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder()
@@ -113,11 +113,12 @@ public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
CONTAINER,
BLOB_NAME,
RANGE_START,
- RANGE_LENGTH
+ RANGE_LENGTH,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index 0e98858e16..a8c41e836b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -37,8 +37,10 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -78,13 +80,20 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce
.defaultValue("0")
.build();
+ private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ ADLS_CREDENTIALS_SERVICE,
+ FILESYSTEM,
+ DIRECTORY,
+ FILE,
+ RANGE_START,
+ RANGE_LENGTH,
+ NUM_RETRIES,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
+ ));
+
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- List<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
- properties.add(RANGE_START);
- properties.add(RANGE_LENGTH);
- properties.add(NUM_RETRIES);
- return properties;
+ return PROPERTIES;
}
@Override
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
index 483a3c9314..9da7375ad4 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java
@@ -141,7 +141,8 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf
MIN_AGE,
MAX_AGE,
MIN_SIZE,
- MAX_SIZE
+ MAX_SIZE,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
private BlobServiceClient storageClient;
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
index 6bb3b0e17d..cc43ffd567 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java
@@ -42,6 +42,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.ADLSFileInfo;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
@@ -143,7 +144,8 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil
MIN_AGE,
MAX_AGE,
MIN_SIZE,
- MAX_SIZE));
+ MAX_SIZE,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
private static final Set<PropertyDescriptor> LISTING_RESET_PROPERTIES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
ADLS_CREDENTIALS_SERVICE,
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
index cb3d8b6669..c5b38becf6 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java
@@ -36,6 +36,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.util.Arrays;
import java.util.Collections;
@@ -131,7 +132,8 @@ public class MoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageProces
DESTINATION_FILESYSTEM,
DESTINATION_DIRECTORY,
FILE,
- CONFLICT_RESOLUTION
+ CONFLICT_RESOLUTION,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
index 85fb00b29e..805832ecd8 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
@@ -63,11 +63,11 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_PRIMARY_URI;
import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_TIMESTAMP;
-@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
-@SeeAlso({ ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class })
+@Tags({"azure", "microsoft", "cloud", "storage", "blob"})
+@SeeAlso({ListAzureBlobStorage_v12.class, FetchAzureBlobStorage_v12.class, DeleteAzureBlobStorage_v12.class})
@CapabilityDescription("Puts content into a blob on Azure Blob Storage. The processor uses Azure Blob Storage client library v12.")
@InputRequirement(Requirement.INPUT_REQUIRED)
-@WritesAttributes({ @WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER),
+@WritesAttributes({@WritesAttribute(attribute = ATTR_NAME_CONTAINER, description = ATTR_DESCRIPTION_CONTAINER),
@WritesAttribute(attribute = ATTR_NAME_BLOBNAME, description = ATTR_DESCRIPTION_BLOBNAME),
@WritesAttribute(attribute = ATTR_NAME_PRIMARY_URI, description = ATTR_DESCRIPTION_PRIMARY_URI),
@WritesAttribute(attribute = ATTR_NAME_ETAG, description = ATTR_DESCRIPTION_ETAG),
@@ -75,7 +75,7 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR
@WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = ATTR_DESCRIPTION_MIME_TYPE),
@WritesAttribute(attribute = ATTR_NAME_LANG, description = ATTR_DESCRIPTION_LANG),
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = ATTR_DESCRIPTION_TIMESTAMP),
- @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH) })
+ @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = ATTR_DESCRIPTION_LENGTH)})
public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
public static final PropertyDescriptor CREATE_CONTAINER = new PropertyDescriptor.Builder()
@@ -87,19 +87,20 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 {
.allowableValues("true", "false")
.defaultValue("false")
.description("Specifies whether to check if the container exists and to automatically create it if it does not. " +
- "Permission to list containers is required. If false, this check is not made, but the Put operation " +
- "will fail if the container does not exist.")
+ "Permission to list containers is required. If false, this check is not made, but the Put operation " +
+ "will fail if the container does not exist.")
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.CONTAINER,
CREATE_CONTAINER,
- BLOB_NAME
+ BLOB_NAME,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
));
@Override
- public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 9ecbbcc032..3c10d068b0 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -33,13 +33,13 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import java.io.BufferedInputStream;
import java.io.InputStream;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -83,18 +83,18 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess
.allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION)
.build();
- private List<PropertyDescriptor> properties;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> props = new ArrayList<>(super.getSupportedPropertyDescriptors());
- props.add(CONFLICT_RESOLUTION);
- properties = Collections.unmodifiableList(props);
- }
+ private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+ ADLS_CREDENTIALS_SERVICE,
+ FILESYSTEM,
+ DIRECTORY,
+ FILE,
+ CONFLICT_RESOLUTION,
+ AzureStorageUtils.PROXY_CONFIGURATION_SERVICE
+ ));
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
+ return PROPERTIES;
}
@Override
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index a73c108acc..654db1469f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -16,6 +16,9 @@
*/
package org.apache.nifi.processors.azure.storage.utils;
+import com.azure.core.http.ProxyOptions;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -43,9 +46,11 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
+import org.apache.nifi.proxy.SocksVersion;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
import org.apache.nifi.services.azure.storage.AzureStorageEmulatorCredentialsDetails;
+import reactor.netty.http.client.HttpClient;
public final class AzureStorageUtils {
public static final String BLOCK = "Block";
@@ -311,4 +316,43 @@ public final class AzureStorageUtils {
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(processContext);
operationContext.setProxy(proxyConfig.createProxy());
}
+
+ /**
+ *
+ * Creates the {@link ProxyOptions proxy options} that {@link HttpClient} will use.
+ *
+ * @param propertyContext is sed to supply Proxy configurations
+ * @return {@link ProxyOptions proxy options}, null if Proxy is not set
+ */
+ public static ProxyOptions getProxyOptions(final PropertyContext propertyContext) {
+ final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(propertyContext);
+
+ if (proxyConfiguration != ProxyConfiguration.DIRECT_CONFIGURATION) {
+
+ final ProxyOptions proxyOptions = new ProxyOptions(
+ getProxyType(proxyConfiguration),
+ new InetSocketAddress(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort()));
+
+ final String proxyUserName = proxyConfiguration.getProxyUserName();
+ final String proxyUserPassword = proxyConfiguration.getProxyUserPassword();
+ if (proxyUserName != null && proxyUserPassword != null) {
+ proxyOptions.setCredentials(proxyUserName, proxyUserPassword);
+ }
+
+ return proxyOptions;
+ }
+
+ return null;
+ }
+
+ private static ProxyOptions.Type getProxyType(ProxyConfiguration proxyConfiguration) {
+ if (proxyConfiguration.getProxyType() == Proxy.Type.HTTP) {
+ return ProxyOptions.Type.HTTP;
+ } else if (proxyConfiguration.getProxyType() == Proxy.Type.SOCKS) {
+ final SocksVersion socksVersion = proxyConfiguration.getSocksVersion();
+ return ProxyOptions.Type.valueOf(socksVersion.name());
+ } else {
+ throw new IllegalArgumentException("Unsupported proxy type: " + proxyConfiguration.getProxyType());
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java
index 6dc3bd873a..ab50166c63 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureStorageIT.java
@@ -21,12 +21,13 @@ import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.proxy.StandardProxyConfigurationService;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
-
import org.junit.jupiter.api.BeforeEach;
import java.io.FileInputStream;
@@ -36,32 +37,66 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
public abstract class AbstractAzureStorageIT {
- private static final Properties CONFIG;
+ private static final Properties CREDENTIALS_CONFIG;
+ private static final Properties PROXY_CONFIG;
private static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+ private static final String PROXY_CONFIGURATION_FILE = System.getProperty("user.home") + "/proxy-configuration.PROPERTIES";
static {
- CONFIG = new Properties();
+ CREDENTIALS_CONFIG = loadConfig(CREDENTIALS_FILE);
+ PROXY_CONFIG = loadConfig(PROXY_CONFIGURATION_FILE);
+ }
+
+ private static Properties loadConfig(String configPath) {
+ Properties loadedProperties = new Properties();
+
assertDoesNotThrow(() -> {
- final FileInputStream fis = new FileInputStream(CREDENTIALS_FILE);
- assertDoesNotThrow(() -> CONFIG.load(fis));
- FileUtils.closeQuietly(fis);
+ final FileInputStream fIS = new FileInputStream(configPath);
+ assertDoesNotThrow(() -> loadedProperties.load(fIS));
+ FileUtils.closeQuietly(fIS);
});
+
+ return loadedProperties;
}
protected String getAccountName() {
- return CONFIG.getProperty("accountName");
+ return CREDENTIALS_CONFIG.getProperty("accountName");
}
protected String getAccountKey() {
- return CONFIG.getProperty("accountKey");
+ return CREDENTIALS_CONFIG.getProperty("accountKey");
}
protected String getEndpointSuffix() {
- String endpointSuffix = CONFIG.getProperty("endpointSuffix");
+ String endpointSuffix = CREDENTIALS_CONFIG.getProperty("endpointSuffix");
return endpointSuffix != null ? endpointSuffix : getDefaultEndpointSuffix();
}
+ protected String getProxyType() {
+ return PROXY_CONFIG.getProperty("proxyType");
+ }
+
+ protected String getSocksVersion() {
+ return PROXY_CONFIG.getProperty("socksVersion");
+ }
+
+ protected String getProxyServerHost() {
+ return PROXY_CONFIG.getProperty("proxyServerHost");
+ }
+
+ protected String getProxyServerPort() {
+ return PROXY_CONFIG.getProperty("proxyServerPort");
+ }
+
+ protected String getProxyUsername() {
+ return PROXY_CONFIG.getProperty("proxyUsername");
+ }
+
+ protected String getProxyUserPassword() {
+ return PROXY_CONFIG.getProperty("proxyUserPassword");
+ }
+
protected abstract String getDefaultEndpointSuffix();
protected TestRunner runner;
@@ -102,4 +137,22 @@ public abstract class AbstractAzureStorageIT {
runner.setProperty(AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, credentialsService.getIdentifier());
}
+
+ protected void configureProxyService() throws InitializationException {
+ final StandardProxyConfigurationService proxyConfigurationService = new StandardProxyConfigurationService();
+ runner.addControllerService("proxy-configuration-service", proxyConfigurationService);
+
+ runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_TYPE, getProxyType());
+ runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.SOCKS_VERSION, getSocksVersion());
+ runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_SERVER_HOST, getProxyServerHost());
+ runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_SERVER_PORT, getProxyServerPort());
+ runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_USER_NAME, getProxyUsername());
+ runner.setProperty(proxyConfigurationService, StandardProxyConfigurationService.PROXY_USER_PASSWORD, getProxyUserPassword());
+
+ runner.assertValid(proxyConfigurationService);
+
+ runner.enableControllerService(proxyConfigurationService);
+
+ runner.setProperty(AzureStorageUtils.PROXY_CONFIGURATION_SERVICE, proxyConfigurationService.getIdentifier());
+ }
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage_v12.java
index 50a8d0bc52..2d7e4f13c4 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureBlobStorage_v12.java
@@ -56,6 +56,17 @@ public class ITDeleteAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
assertSuccess(BLOB_NAME);
}
+ @Test
+ public void testDeleteBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
+ uploadBlob(BLOB_NAME, BLOB_DATA);
+
+ configureProxyService();
+
+ runProcessor();
+
+ assertSuccess(BLOB_NAME);
+ }
+
@Test
public void testDeleteBlobWithCompoundName() throws Exception {
String blobName = "dir1/dir2/blob1";
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
index ac6c75a29d..0c7864d963 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java
@@ -22,6 +22,7 @@ import com.azure.storage.file.datalake.models.DataLakeStorageException;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.Test;
@@ -57,6 +58,23 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
testSuccessfulDelete(fileSystemName, directory, null, inputFlowFileContent, inputFlowFileContent);
}
+ @Test
+ public void testDeleteDirectoryWithFilesUsingProxyConfigurationService() throws InitializationException {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String fileContent = "AzureFileContent";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, fileContent);
+
+ configureProxyService();
+
+ // WHEN
+ // THEN
+ testSuccessfulDelete(fileSystemName, directory, null, inputFlowFileContent, inputFlowFileContent);
+ }
+
@Test
public void testDeleteEmptyDirectoryWithFSTypeDirectory() {
// GIVEN
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
index 64f2de3c04..4218b6c726 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage_v12.java
@@ -53,6 +53,17 @@ public class ITFetchAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT
assertSuccess(BLOB_NAME, BLOB_DATA);
}
+ @Test
+ public void testFetchBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
+ uploadBlob(BLOB_NAME, BLOB_DATA);
+
+ configureProxyService();
+
+ runProcessor();
+
+ assertSuccess(BLOB_NAME, BLOB_DATA);
+ }
+
@Test
public void testFetchBlobWithCompoundName() throws Exception {
String blobName = "dir1/dir2/blob1";
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
index 1b665d4109..6d50691c6f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java
@@ -21,6 +21,7 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.junit.jupiter.api.Test;
@@ -56,6 +57,22 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT);
}
+ @Test
+ public void testFetchFileFromDirectoryUsingProxyConfigurationService() throws InitializationException {
+ // GIVEN
+ String directory = "TestDirectory";
+ String filename = "testFile.txt";
+ String inputFlowFileContent = "InputFlowFileContent";
+
+ createDirectoryAndUploadFile(directory, filename, TEST_FILE_CONTENT);
+
+ configureProxyService();
+
+ // WHEN
+ // THEN
+ testSuccessfulFetch(fileSystemName, directory, filename, inputFlowFileContent, TEST_FILE_CONTENT);
+ }
+
@Test
public void testFetchFileFromRoot() {
// GIVEN
@@ -345,7 +362,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
- public void testFetchWithRangeZeroOne() throws Exception {
+ public void testFetchWithRangeZeroOne() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
@@ -359,7 +376,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
- public void testFetchWithRangeOneOne() throws Exception {
+ public void testFetchWithRangeOneOne() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
@@ -373,7 +390,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
- public void testFetchWithRangeTwentyThreeTwentySix() throws Exception {
+ public void testFetchWithRangeTwentyThreeTwentySix() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
@@ -387,7 +404,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
- public void testFetchWithRangeLengthGreater() throws Exception {
+ public void testFetchWithRangeLengthGreater() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
@@ -401,7 +418,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
- public void testFetchWithRangeLengthUnset() throws Exception {
+ public void testFetchWithRangeLengthUnset() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
@@ -415,7 +432,7 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT
}
@Test
- public void testFetchWithRangeStartOutOfRange() throws Exception {
+ public void testFetchWithRangeStartOutOfRange() {
// GIVEN
String directory= "A Test Directory";
String filename = "testFile.txt";
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java
index 942e834074..5655cf622f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage_v12.java
@@ -55,6 +55,17 @@ public class ITListAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
assertSuccess(BLOB_NAME_1, BLOB_NAME_2, BLOB_NAME_3, BLOB_NAME_4);
}
+ @Test
+ public void testListBlobsUsingProxyConfigurationService() throws Exception {
+ uploadBlobs();
+
+ configureProxyService();
+
+ runProcessor();
+
+ assertSuccess(BLOB_NAME_1, BLOB_NAME_2, BLOB_NAME_3, BLOB_NAME_4);
+ }
+
@Test
public void testListBlobsWithPrefix_1() throws Exception {
uploadBlobs();
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
index 09ca9b2cfe..6c45663e33 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java
@@ -89,6 +89,16 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
}
+ @Test
+ public void testListRootRecursiveUsingProxyConfigurationService() throws Exception {
+ runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
+ configureProxyService();
+
+ runProcessor();
+
+ assertSuccess("file1", "file2", "dir1/file11", "dir1/file12", "dir1/dir11/file111", "dir 2/file 21");
+ }
+
@Test
public void testListRootNonRecursive() throws Exception {
runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "");
@@ -287,7 +297,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
}
}
- private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) throws Exception {
+ private void assertFlowFile(TestFile testFile, MockFlowFile flowFile) {
flowFile.assertAttributeEquals(ATTR_NAME_FILESYSTEM, fileSystemName);
flowFile.assertAttributeEquals(ATTR_NAME_FILE_PATH, testFile.getFilePath());
flowFile.assertAttributeEquals(ATTR_NAME_DIRECTORY, testFile.getDirectory());
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
index b98317f733..69ed942d3e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java
@@ -81,6 +81,17 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
}
+ @Test
+ public void testMoveFileToExistingDirectoryUsingProxyConfigurationService() throws Exception {
+ createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
+ createDirectory(DESTINATION_DIRECTORY);
+ configureProxyService();
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(SOURCE_DIRECTORY, DESTINATION_DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
@Test
public void testMoveFileToExistingDirectoryWithReplaceResolution() throws Exception {
createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
index 701df85f0e..762012246f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java
@@ -54,6 +54,15 @@ public class ITPutAzureBlobStorage_v12 extends AbstractAzureBlobStorage_v12IT {
assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
}
+ @Test
+ public void testPutBlobWithSimpleNameUsingProxyConfigurationService() throws Exception {
+ configureProxyService();
+
+ runProcessor(BLOB_DATA);
+
+ assertSuccess(getContainerName(), BLOB_NAME, BLOB_DATA);
+ }
+
@Test
public void testPutBlobWithCompoundName() throws Exception {
String blobName = "dir1/dir2/blob1";
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 18f4341e8b..20efbb4372 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -39,9 +39,9 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doThrow;
@@ -78,6 +78,16 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
}
+ @Test
+ public void testPutFileToExistingDirectoryUsingProxyConfigurationService() throws Exception {
+ fileSystemClient.createDirectory(DIRECTORY);
+ configureProxyService();
+
+ runProcessor(FILE_DATA);
+
+ assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
+ }
+
@Test
public void testPutFileToExistingDirectoryWithReplaceResolution() throws Exception {
fileSystemClient.createDirectory(DIRECTORY);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java
index e6d498c20c..0b9086275c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/ProxyConfiguration.java
@@ -50,6 +50,10 @@ public class ProxyConfiguration {
description.append(" Supported proxies: ");
description.append(specs.stream().map(ProxySpec::getDisplayName).collect(Collectors.joining(", ")));
+ if (specs.contains(SOCKS)) {
+ description.append(" In case of SOCKS, it is not guaranteed that the selected SOCKS Version will be used by the processor.");
+ }
+
return new PropertyDescriptor.Builder()
.fromPropertyDescriptor(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)
.description(description.toString())
@@ -62,7 +66,7 @@ public class ProxyConfiguration {
* @return sorted unique specs
*/
private static Set<ProxySpec> getUniqueProxySpecs(ProxySpec ... _specs) {
- final Set<ProxySpec> specs = Arrays.stream(_specs).sorted().collect(Collectors.toSet());
+ final Set<ProxySpec> specs = Arrays.stream(_specs).collect(Collectors.toSet());
if (specs.contains(HTTP_AUTH)) {
specs.remove(HTTP);
}
@@ -148,6 +152,7 @@ public class ProxyConfiguration {
}
private Proxy.Type proxyType = Proxy.Type.DIRECT;
+ private SocksVersion socksVersion;
private String proxyServerHost;
private Integer proxyServerPort;
private String proxyUserName;
@@ -161,6 +166,14 @@ public class ProxyConfiguration {
this.proxyType = proxyType;
}
+ public SocksVersion getSocksVersion() {
+ return socksVersion;
+ }
+
+ public void setSocksVersion(SocksVersion socksVersion) {
+ this.socksVersion = socksVersion;
+ }
+
public String getProxyServerHost() {
return proxyServerHost;
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/SocksVersion.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/SocksVersion.java
new file mode 100644
index 0000000000..27faa48861
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-api/src/main/java/org/apache/nifi/proxy/SocksVersion.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.proxy;
+
+public enum SocksVersion {
+ SOCKS4,
+ SOCKS5
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java
index a3b3f516b3..28d49026fb 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-proxy-configuration-bundle/nifi-proxy-configuration/src/main/java/org/apache/nifi/proxy/StandardProxyConfigurationService.java
@@ -37,7 +37,7 @@ import java.util.List;
@Tags({"Proxy"})
public class StandardProxyConfigurationService extends AbstractControllerService implements ProxyConfigurationService {
- static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
.name("proxy-type")
.displayName("Proxy Type")
.description("Proxy type.")
@@ -46,7 +46,17 @@ public class StandardProxyConfigurationService extends AbstractControllerService
.required(true)
.build();
- static final PropertyDescriptor PROXY_SERVER_HOST = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor SOCKS_VERSION = new PropertyDescriptor.Builder()
+ .name("socks-version")
+ .displayName("SOCKS Version")
+ .description("SOCKS Protocol Version")
+ .allowableValues(SocksVersion.values())
+ .defaultValue(SocksVersion.SOCKS5.name())
+ .dependsOn(PROXY_TYPE, Proxy.Type.SOCKS.name())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor PROXY_SERVER_HOST = new PropertyDescriptor.Builder()
.name("proxy-server-host")
.displayName("Proxy Server Host")
.description("Proxy server hostname or ip-address.")
@@ -54,7 +64,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
- static final PropertyDescriptor PROXY_SERVER_PORT = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROXY_SERVER_PORT = new PropertyDescriptor.Builder()
.name("proxy-server-port")
.displayName("Proxy Server Port")
.description("Proxy server port number.")
@@ -62,7 +72,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
- static final PropertyDescriptor PROXY_USER_NAME = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROXY_USER_NAME = new PropertyDescriptor.Builder()
.name("proxy-user-name")
.displayName("Proxy User Name")
.description("The name of the proxy client for user authentication.")
@@ -70,7 +80,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
- static final PropertyDescriptor PROXY_USER_PASSWORD = new PropertyDescriptor.Builder()
+ public static final PropertyDescriptor PROXY_USER_PASSWORD = new PropertyDescriptor.Builder()
.name("proxy-user-password")
.displayName("Proxy User Password")
.description("The password of the proxy client for user authentication.")
@@ -85,6 +95,7 @@ public class StandardProxyConfigurationService extends AbstractControllerService
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(PROXY_TYPE);
+ properties.add(SOCKS_VERSION);
properties.add(PROXY_SERVER_HOST);
properties.add(PROXY_SERVER_PORT);
properties.add(PROXY_USER_NAME);
@@ -95,7 +106,10 @@ public class StandardProxyConfigurationService extends AbstractControllerService
@OnEnabled
public void setConfiguredValues(final ConfigurationContext context) {
configuration = new ProxyConfiguration();
- configuration.setProxyType(Proxy.Type.valueOf(context.getProperty(PROXY_TYPE).getValue()));
+
+ final Proxy.Type proxyType = Proxy.Type.valueOf(context.getProperty(PROXY_TYPE).getValue());
+ configuration.setProxyType(proxyType);
+ configuration.setSocksVersion(proxyType == Proxy.Type.SOCKS ? SocksVersion.valueOf(context.getProperty(SOCKS_VERSION).getValue()) : null);
configuration.setProxyServerHost(context.getProperty(PROXY_SERVER_HOST).evaluateAttributeExpressions().getValue());
configuration.setProxyServerPort(context.getProperty(PROXY_SERVER_PORT).evaluateAttributeExpressions().asInteger());
configuration.setProxyUserName(context.getProperty(PROXY_USER_NAME).evaluateAttributeExpressions().getValue());