You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/05/19 09:54:03 UTC
[nifi] branch master updated: NIFI-7409: Azure managed identity
support to Azure Datalake processors
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 852715a NIFI-7409: Azure managed identity support to Azure Datalake processors
852715a is described below
commit 852715aadd9e989d37aeadecfe92093e212a5ad1
Author: sjyang18 <il...@hotmail.com>
AuthorDate: Fri May 1 18:55:50 2020 +0000
NIFI-7409: Azure managed identity support to Azure Datalake processors
NIFI-7409: review changes
NIFI-7409: ordering import statements
NIFI-7409: changed validateCredentialProperties logic
This closes #4249.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi-azure-processors/pom.xml | 16 +--
.../AbstractAzureDataLakeStorageProcessor.java | 111 +++++++++++++++------
...eStorageCredentialsControllerServiceLookup.java | 3 +-
.../storage/TestAbstractAzureDataLakeStorage.java | 19 +++-
nifi-nar-bundles/nifi-azure-bundle/pom.xml | 42 ++++++++
5 files changed, 147 insertions(+), 44 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 0a3602d..2cff8ee 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -57,6 +57,16 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-core</artifactId>
+ <version>1.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-identity</artifactId>
+ <version>1.0.6</version>
+ </dependency>
+ <dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>${azure-eventhubs.version}</version>
@@ -75,12 +85,6 @@
<artifactId>azure-storage-file-datalake</artifactId>
<version>12.1.1</version>
</dependency>
- <!-- overriding jackson-core in azure-storage -->
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-core</artifactId>
- <version>2.10.3</version>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
index 40d276c..af75f99 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java
@@ -16,30 +16,32 @@
*/
package org.apache.nifi.processors.azure;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.azure.identity.ManagedIdentityCredential;
+import com.azure.identity.ManagedIdentityCredentialBuilder;
+import com.azure.storage.common.StorageSharedKeyCredential;
+import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.context.PropertyContext;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.commons.lang3.StringUtils;
-
-import com.azure.storage.common.StorageSharedKeyCredential;
-import com.azure.storage.file.datalake.DataLakeServiceClient;
-import com.azure.storage.file.datalake.DataLakeServiceClientBuilder;
-
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.Map;
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {
@@ -85,6 +87,13 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder()
+ .name("use-managed-identity")
+ .displayName("Use Azure Managed Identity")
+ .description("Choose whether or not to use the managed identity of Azure VM/VMSS ")
+ .required(false).defaultValue("false").allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
+
public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name")
.description("Name of the Azure Storage File System. It is assumed to be already existing.")
@@ -110,6 +119,15 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.defaultValue("${azure.filename}")
.build();
+ public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder()
+ .name("endpoint-suffix").displayName("Endpoint Suffix")
+ .description("Endpoint Suffix")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .defaultValue("dfs.core.windows.net")
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description(
"Files that have been successfully written to Azure storage are transferred to this relationship")
.build();
@@ -118,9 +136,14 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
.build();
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(
- Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME, AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY,
- AbstractAzureDataLakeStorageProcessor.SAS_TOKEN, AbstractAzureDataLakeStorageProcessor.FILESYSTEM,
- AbstractAzureDataLakeStorageProcessor.DIRECTORY, AbstractAzureDataLakeStorageProcessor.FILE));
+ Arrays.asList(AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME,
+ AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY,
+ AbstractAzureDataLakeStorageProcessor.SAS_TOKEN,
+ AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY,
+ AbstractAzureDataLakeStorageProcessor.ENDPOINT_SUFFIX,
+ AbstractAzureDataLakeStorageProcessor.FILESYSTEM,
+ AbstractAzureDataLakeStorageProcessor.DIRECTORY,
+ AbstractAzureDataLakeStorageProcessor.FILE));
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(
@@ -134,17 +157,32 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public static Collection<ValidationResult> validateCredentialProperties(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
- final String accountName = validationContext.getProperty(ACCOUNT_NAME).getValue();
- final String accountKey = validationContext.getProperty(ACCOUNT_KEY).getValue();
- final String sasToken = validationContext.getProperty(SAS_TOKEN).getValue();
-
- if (StringUtils.isNotBlank(accountName)
- && ((StringUtils.isNotBlank(accountKey) && StringUtils.isNotBlank(sasToken)) || (StringUtils.isBlank(accountKey) && StringUtils.isBlank(sasToken)))) {
- results.add(new ValidationResult.Builder().subject("Azure Storage Credentials").valid(false)
- .explanation("either " + ACCOUNT_NAME.getDisplayName() + " with " + ACCOUNT_KEY.getDisplayName() +
- " or " + ACCOUNT_NAME.getDisplayName() + " with " + SAS_TOKEN.getDisplayName() +
- " must be specified, not both")
- .build());
+
+ final boolean useManagedIdentity = validationContext.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+ final boolean accountKeyIsSet = validationContext.getProperty(ACCOUNT_KEY).isSet();
+ final boolean sasTokenIsSet = validationContext.getProperty(SAS_TOKEN).isSet();
+
+ int credential_config_found = 0;
+ if(useManagedIdentity) credential_config_found++;
+ if(accountKeyIsSet) credential_config_found++;
+ if(sasTokenIsSet) credential_config_found++;
+
+ if(credential_config_found == 0){
+ final String msg = String.format(
+ "At least one of ['%s', '%s', '%s'] should be set",
+ ACCOUNT_KEY.getDisplayName(),
+ SAS_TOKEN.getDisplayName(),
+ USE_MANAGED_IDENTITY.getDisplayName()
+ );
+ results.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
+ } else if(credential_config_found > 1) {
+ final String msg = String.format(
+ "Only one of ['%s', '%s', '%s'] should be set",
+ ACCOUNT_KEY.getDisplayName(),
+ SAS_TOKEN.getDisplayName(),
+ USE_MANAGED_IDENTITY.getDisplayName()
+ );
+ results.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
}
return results;
}
@@ -154,7 +192,9 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
final String accountName = context.getProperty(ACCOUNT_NAME).evaluateAttributeExpressions(attributes).getValue();
final String accountKey = context.getProperty(ACCOUNT_KEY).evaluateAttributeExpressions(attributes).getValue();
final String sasToken = context.getProperty(SAS_TOKEN).evaluateAttributeExpressions(attributes).getValue();
- final String endpoint = String.format("https://%s.dfs.core.windows.net", accountName);
+ final String endpointSuffix = context.getProperty(ENDPOINT_SUFFIX).evaluateAttributeExpressions(attributes).getValue();
+ final String endpoint = String.format("https://%s.%s", accountName,endpointSuffix);
+ final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
DataLakeServiceClient storageClient;
if (StringUtils.isNotBlank(accountKey)) {
final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
@@ -164,6 +204,13 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
} else if (StringUtils.isNotBlank(sasToken)) {
storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
.buildClient();
+ } else if(useManagedIdentity){
+ final ManagedIdentityCredential misCrendential = new ManagedIdentityCredentialBuilder()
+ .build();
+ storageClient = new DataLakeServiceClientBuilder()
+ .endpoint(endpoint)
+ .credential(misCrendential)
+ .buildClient();
} else {
throw new IllegalArgumentException(String.format("Either '%s' or '%s' must be defined.",
ACCOUNT_KEY.getDisplayName(), SAS_TOKEN.getDisplayName()));
@@ -181,4 +228,4 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
-}
\ No newline at end of file
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
index 4ac2f07..1cfe1a7 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageCredentialsControllerServiceLookup.java
@@ -16,12 +16,13 @@
*/
package org.apache.nifi.services.azure.storage;
+import java.util.Map;
+
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
-import java.util.Map;
@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
@CapabilityDescription("Provides an AzureStorageCredentialsService that can be used to dynamically select another AzureStorageCredentialsService. " +
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
index 59800bb..960b1fe 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java
@@ -16,17 +16,18 @@
*/
package org.apache.nifi.processors.azure.storage;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_KEY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ACCOUNT_NAME;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILE;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM;
import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.SAS_TOKEN;
+import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.USE_MANAGED_IDENTITY;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
public class TestAbstractAzureDataLakeStorage {
@@ -58,6 +59,14 @@ public class TestAbstractAzureDataLakeStorage {
}
@Test
+ public void testValidWhenAccountNameAndUseManagedIdentity() {
+ runner.removeProperty(ACCOUNT_KEY);
+ runner.setProperty(USE_MANAGED_IDENTITY, "true");
+
+ runner.assertValid();
+ }
+
+ @Test
public void testNotValidWhenNoAccountNameSpecified() {
runner.removeProperty(ACCOUNT_NAME);
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index d4983e1..c5ecc49 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -27,6 +27,7 @@
<properties>
<azure-storage.version>8.4.0</azure-storage.version>
+ <jackson.version>2.10.3</jackson.version>
</properties>
<modules>
@@ -50,6 +51,47 @@
</exclusion>
</exclusions>
</dependency>
+ <!-- dependency convergency resolution -->
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-xml</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-jaxb-annotations</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-jsr310</artifactId>
+ <version>${jackson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-text</artifactId>
+ <version>1.8</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>