You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/04/30 21:20:11 UTC
[nifi] branch master updated: NIFI-6149: Azure EventHub Managed
identities support patch
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new aa1e272 NIFI-6149: Azure EventHub Managed identities support patch
aa1e272 is described below
commit aa1e272052e0c271458ee458a79a1fecbc7f7376
Author: sjyang18 <il...@hotmail.com>
AuthorDate: Wed Apr 22 18:42:35 2020 +0000
NIFI-6149: Azure EventHub Managed identities support patch
review changes
additional review changes
NIFI-6149: typo fixes
This closes #4226.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi-azure-processors/pom.xml | 10 ++-
.../azure/eventhub/ConsumeAzureEventHub.java | 44 ++++++-----
.../azure/eventhub/GetAzureEventHub.java | 46 +++++++----
.../azure/eventhub/PutAzureEventHub.java | 49 ++++++++----
.../azure/eventhub/utils/AzureEventHubUtils.java | 90 ++++++++++++++++++++++
.../azure/eventhub/GetAzureEventHubTest.java | 18 ++++-
.../azure/eventhub/PutAzureEventHubTest.java | 9 +++
.../azure/eventhub/TestConsumeAzureEventHub.java | 34 +++++++-
8 files changed, 246 insertions(+), 54 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 5e909a5..b0e80df 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -20,8 +20,8 @@
<artifactId>nifi-azure-processors</artifactId>
<packaging>jar</packaging>
<properties>
- <azure-eventhubs.version>2.3.2</azure-eventhubs.version>
- <azure-eventhubs-eph.version>2.5.2</azure-eventhubs-eph.version>
+ <azure-eventhubs.version>3.1.1</azure-eventhubs.version>
+ <azure-eventhubs-eph.version>3.1.1</azure-eventhubs-eph.version>
</properties>
<dependencies>
<dependency>
@@ -89,6 +89,12 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <version>1.12.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index f1e3a64..7873e4c 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -23,7 +23,6 @@ import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
-import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -76,6 +75,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.util.StringUtils.isEmpty;
+import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from Azure Event Hubs, writing the contents of the message to the content of the FlowFile.")
@@ -115,17 +115,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.description("The name of the shared access policy. This policy must have Listen claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .required(true)
+ .required(false)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(AzureEventHubUtils.POLICY_PRIMARY_KEY)
.name("event-hub-shared-access-policy-primary-key")
- .displayName("Shared Access Policy Primary Key")
- .description("The primary key of the shared access policy.")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .sensitive(true)
- .required(true)
.build();
+ static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder()
.name("event-hub-consumer-group")
.displayName("Consumer Group")
@@ -261,7 +257,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static {
PROPERTIES = Collections.unmodifiableList(Arrays.asList(
- NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
+ NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
RECORD_READER, RECORD_WRITER,
INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT,
STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME
@@ -335,6 +331,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.valid(false)
.build());
}
+ results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext));
return results;
}
@@ -347,9 +344,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
}
}
- public class EventProcessorFactory implements IEventProcessorFactory {
+ public class EventProcessorFactory implements IEventProcessorFactory<EventProcessor> {
@Override
- public IEventProcessor createEventProcessor(PartitionContext context) throws Exception {
+ public EventProcessor createEventProcessor(PartitionContext context) throws Exception {
final EventProcessor eventProcessor = new EventProcessor();
return eventProcessor;
}
@@ -581,12 +578,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(EVENT_HUB_NAME, eventHubName);
- final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
- validateRequiredProperty(ACCESS_POLICY_NAME, sasName);
-
- final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
- validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
-
final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
@@ -627,9 +618,22 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey);
- final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder().setNamespaceName(namespaceName).setEventHubName( eventHubName).setSasKeyName(sasName).setSasKey(sasKey);
-
- eventProcessorHost = new EventProcessorHost(consumerHostname, eventHubName, consumerGroupName, eventHubConnectionString.toString(), storageConnectionString, containerName);
+ final String connectionString;
+ final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+ if(useManagedIdentity) {
+ connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespaceName, eventHubName);
+ } else {
+ final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
+ validateRequiredProperty(ACCESS_POLICY_NAME, sasName);
+ final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
+ validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
+ connectionString = AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, eventHubName, sasName, sasKey);
+ }
+ eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder
+ .newBuilder(consumerHostname, consumerGroupName)
+ .useAzureStorageCheckpointLeaseManager(storageConnectionString, containerName, null)
+ .useEventHubConnectionString(connectionString, eventHubName)
+ .build();
options.setExceptionNotification(e -> {
getLogger().error("An error occurred while receiving messages from Azure Event Hub {}" +
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index cb8c9ac..78a3327 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -22,6 +22,7 @@ import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -54,6 +55,8 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
@@ -63,6 +66,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from Microsoft Azure Event Hubs, writing the contents of the Azure message to the content of the FlowFile. "
@@ -103,16 +107,10 @@ public class GetAzureEventHub extends AbstractProcessor {
.description("The name of the shared access policy. This policy must have Listen claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .required(true)
- .build();
- static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
- .name("Shared Access Policy Primary Key")
- .description("The primary key of the shared access policy")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .sensitive(true)
- .required(true)
+ .required(false)
.build();
+ static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
+ static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
.name("Number of Event Hub Partitions")
@@ -163,6 +161,7 @@ public class GetAzureEventHub extends AbstractProcessor {
.description("Any FlowFile that is successfully received from the event hub will be transferred to this Relationship.")
.build();
+
private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();
private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
private volatile Instant configuredEnqueueTime;
@@ -184,6 +183,7 @@ public class GetAzureEventHub extends AbstractProcessor {
_propertyDescriptors.add(NAMESPACE);
_propertyDescriptors.add(ACCESS_POLICY);
_propertyDescriptors.add(POLICY_PRIMARY_KEY);
+ _propertyDescriptors.add(USE_MANAGED_IDENTITY);
_propertyDescriptors.add(NUM_PARTITIONS);
_propertyDescriptors.add(CONSUMER_GROUP);
_propertyDescriptors.add(ENQUEUE_TIME);
@@ -207,10 +207,16 @@ public class GetAzureEventHub extends AbstractProcessor {
return propertyDescriptors;
}
+ @Override
+ protected Collection<ValidationResult> customValidate(ValidationContext context) {
+ List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
+ return retVal;
+ }
+
protected void setupReceiver(final String connectionString, final ScheduledExecutorService executor) throws ProcessException {
try {
- EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
- eventHubClient = EventHubClient.createSync(connectionString, executor);
+ EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
+ eventHubClient = EventHubClient.createFromConnectionStringSync(connectionString, executor);
} catch (IOException | EventHubException e) {
throw new ProcessException(e);
}
@@ -301,11 +307,23 @@ public class GetAzureEventHub extends AbstractProcessor {
}
this.partitionNames = partitionNames;
- final String policyName = context.getProperty(ACCESS_POLICY).getValue();
- final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+ final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+ final String connectionString;
+
+ if(useManagedIdentity){
+ connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace,eventHubName);
+ } else {
+ final String policyName = context.getProperty(ACCESS_POLICY).getValue();
+ final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
+ connectionString = new ConnectionStringBuilder()
+ .setEndpoint(new URI("amqps://"+namespace+serviceBusEndpoint))
+ .setEventHubName(eventHubName)
+ .setSasKeyName(policyName)
+ .setSasKey(policyKey).toString();
+ }
if(context.getProperty(ENQUEUE_TIME).isSet()) {
configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
@@ -324,8 +342,6 @@ public class GetAzureEventHub extends AbstractProcessor {
}
executor = Executors.newScheduledThreadPool(4);
- final String connectionString = new ConnectionStringBuilder().setEndpoint(
- new URI("amqps://"+namespace+serviceBusEndpoint)).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
setupReceiver(connectionString, executor);
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index ec47384..51bb5bf 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -16,8 +16,10 @@
*/
package org.apache.nifi.processors.azure.eventhub;
+import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -34,7 +36,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
@@ -53,6 +54,8 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
@@ -91,16 +94,11 @@ public class PutAzureEventHub extends AbstractProcessor {
.description("The name of the shared access policy. This policy must have Send claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .required(true)
- .build();
- static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
- .name("Shared Access Policy Primary Key")
- .description("The primary key of the shared access policy")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .sensitive(true)
- .required(true)
+ .required(false)
.build();
+ static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
+ static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
+
static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
.name("partitioning-key-attribute-name")
.displayName("Partitioning Key Attribute Name")
@@ -144,6 +142,7 @@ public class PutAzureEventHub extends AbstractProcessor {
_propertyDescriptors.add(NAMESPACE);
_propertyDescriptors.add(ACCESS_POLICY);
_propertyDescriptors.add(POLICY_PRIMARY_KEY);
+ _propertyDescriptors.add(USE_MANAGED_IDENTITY);
_propertyDescriptors.add(PARTITIONING_KEY_ATTRIBUTE_NAME);
_propertyDescriptors.add(MAX_BATCH_SIZE);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
@@ -178,6 +177,13 @@ public class PutAzureEventHub extends AbstractProcessor {
}
@Override
+ protected Collection<ValidationResult> customValidate(ValidationContext context) {
+ List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
+ return retVal;
+ }
+
+
+ @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
try {
populateSenderQueue(context);
@@ -314,8 +320,15 @@ public class PutAzureEventHub extends AbstractProcessor {
final int numThreads = context.getMaxConcurrentTasks();
senderQueue = new LinkedBlockingQueue<>(numThreads);
executor = Executors.newScheduledThreadPool(4);
- final String policyName = context.getProperty(ACCESS_POLICY).getValue();
- final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
+ final boolean useManagedIdentiy = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+ final String policyName, policyKey;
+ if(useManagedIdentiy) {
+ policyName = AzureEventHubUtils.MANAGED_IDENTITY_POLICY;
+ policyKey =null;
+ } else {
+ policyName = context.getProperty(ACCESS_POLICY).getValue();
+ policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
+ }
final String namespace = context.getProperty(NAMESPACE).getValue();
final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
for (int i = 0; i < numThreads; i++) {
@@ -345,8 +358,14 @@ public class PutAzureEventHub extends AbstractProcessor {
throws ProcessException{
try {
- EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
- return EventHubClient.createSync(getConnectionString(namespace, eventHubName, policyName, policyKey), executor);
+ EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
+ final String connectionString;
+ if(policyName == AzureEventHubUtils.MANAGED_IDENTITY_POLICY) {
+ connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace, eventHubName);
+ } else{
+ connectionString = getConnectionString(namespace, eventHubName, policyName, policyKey);
+ }
+ return EventHubClient.createFromConnectionStringSync(connectionString, executor);
} catch (IOException | EventHubException | IllegalConnectionStringFormatException e) {
getLogger().error("Failed to create EventHubClient due to {}", new Object[]{e.getMessage()}, e);
throw new ProcessException(e);
@@ -354,7 +373,7 @@ public class PutAzureEventHub extends AbstractProcessor {
}
protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){
- return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
+ return AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace, eventHubName, policyName, policyKey);
}
/**
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
new file mode 100644
index 0000000..d95e27f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class AzureEventHubUtils {
+
+ public static final String MANAGED_IDENTITY_POLICY = ConnectionStringBuilder.MANAGED_IDENTITY_AUTHENTICATION;
+
+ public static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
+ .name("Shared Access Policy Primary Key")
+ .description("The primary key of the shared access policy")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .sensitive(true)
+ .required(false)
+ .build();
+
+ public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder()
+ .name("use-managed-identity")
+ .displayName("Use Azure Managed Identity")
+ .description("Choose whether or not to use the managed identity of Azure VM/VMSS")
+ .required(false).defaultValue("false").allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
+
+ public static List<ValidationResult> customValidate(PropertyDescriptor accessPolicyDescriptor,
+ PropertyDescriptor policyKeyDescriptor,
+ ValidationContext context) {
+ List<ValidationResult> retVal = new ArrayList<>();
+
+ boolean accessPolicyIsSet = context.getProperty(accessPolicyDescriptor).isSet();
+ boolean policyKeyIsSet = context.getProperty(policyKeyDescriptor).isSet();
+ boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+
+ if (useManagedIdentity && (accessPolicyIsSet || policyKeyIsSet) ) {
+ final String msg = String.format(
+ "('%s') and ('%s' with '%s') fields cannot be set at the same time.",
+ USE_MANAGED_IDENTITY.getDisplayName(),
+ accessPolicyDescriptor.getDisplayName(),
+ POLICY_PRIMARY_KEY.getDisplayName()
+ );
+ retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
+ } else if (!useManagedIdentity && (!accessPolicyIsSet || !policyKeyIsSet)) {
+ final String msg = String.format(
+ "either('%s') or (%s with '%s') must be set",
+ USE_MANAGED_IDENTITY.getDisplayName(),
+ accessPolicyDescriptor.getDisplayName(),
+ POLICY_PRIMARY_KEY.getDisplayName()
+ );
+ retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
+ }
+ return retVal;
+ }
+
+ public static String getManagedIdentityConnectionString(final String namespace, final String eventHubName){
+ return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName)
+ .setAuthentication(MANAGED_IDENTITY_POLICY).toString();
+ }
+ public static String getSharedAccessSignatureConnectionString(final String namespace, final String eventHubName, final String sasName, final String sasKey) {
+ return new ConnectionStringBuilder()
+ .setNamespaceName(namespace)
+ .setEventHubName(eventHubName)
+ .setSasKeyName(sasName)
+ .setSasKey(sasKey).toString();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index 6e55f60..88bcfa8 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -78,7 +78,23 @@ public class GetAzureEventHubTest {
testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
testRunner.assertValid();
}
-
+ @Test
+ public void testProcessorConfigValidityWithManagedIdentityFlag() {
+ testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName);
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutAzureEventHub.USE_MANAGED_IDENTITY,"true");
+ testRunner.assertNotValid();
+ testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4");
+ testRunner.assertValid();
+ testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z");
+ testRunner.assertValid();
+ testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_SIZE, "5");
+ testRunner.assertValid();
+ testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
+ testRunner.assertValid();
+ }
@Test
public void verifyRelationships(){
assert(1 == processor.getRelationships().size());
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index 83ed649..183b80d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -89,6 +89,15 @@ public class PutAzureEventHubTest {
testRunner.assertValid();
}
@Test
+ public void testProcessorConfigValidityWithManagedIdentityFlag() {
+ testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName);
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);
+ testRunner.assertNotValid();
+ testRunner.setProperty(PutAzureEventHub.USE_MANAGED_IDENTITY,"true");
+ testRunner.assertValid();
+ }
+ @Test
public void verifyRelationships(){
assert(2 == processor.getRelationships().size());
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index e7edef7..99541a5 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
@@ -39,6 +40,8 @@ import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.SharedSessionState;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -64,8 +67,15 @@ import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
public class TestConsumeAzureEventHub {
+ private static final String namespaceName = "nifi-azure-hub";
+ private static final String eventHubName = "get-test";
+ private static final String storageAccountName = "test-sa";
+ private static final String storageAccountKey = "test-sa-key";
+
private ConsumeAzureEventHub.EventProcessor eventProcessor;
private MockProcessSession processSession;
private SharedSessionState sharedState;
@@ -97,7 +107,29 @@ public class TestConsumeAzureEventHub {
when(partitionContext.getPartitionId()).thenReturn("partition-id");
when(partitionContext.getConsumerGroupName()).thenReturn("consumer-group");
}
-
+ @Test
+ public void testProcessorConfigValidityWithManagedIdentityFlag() throws InitializationException {
+ TestRunner testRunner = TestRunners.newTestRunner(processor);
+ testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
+ testRunner.assertNotValid();
+ testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
+ testRunner.assertNotValid();
+ final MockRecordParser reader = new MockRecordParser();
+ final MockRecordWriter writer = new MockRecordWriter();
+ testRunner.addControllerService("writer", writer);
+ testRunner.enableControllerService(writer);
+ testRunner.addControllerService("reader", reader);
+ testRunner.enableControllerService(reader);
+ testRunner.setProperty(ConsumeAzureEventHub.RECORD_WRITER, "writer");
+ testRunner.setProperty(ConsumeAzureEventHub.RECORD_READER, "reader");
+ testRunner.assertNotValid();
+ testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
+ testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey);
+ testRunner.assertNotValid();
+
+ testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true");
+ testRunner.assertValid();
+ }
@Test
public void testReceiveOne() throws Exception {
final Iterable<EventData> eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));