You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/04/30 21:20:11 UTC

[nifi] branch master updated: NIFI-6149: Azure EventHub Managed identities support patch

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new aa1e272  NIFI-6149: Azure EventHub Managed identities support patch
aa1e272 is described below

commit aa1e272052e0c271458ee458a79a1fecbc7f7376
Author: sjyang18 <il...@hotmail.com>
AuthorDate: Wed Apr 22 18:42:35 2020 +0000

    NIFI-6149: Azure EventHub Managed identities support patch
    
    review changes
    additional review changes
    NIFI-6149: typo fixes
    
    This closes #4226.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../nifi-azure-processors/pom.xml                  | 10 ++-
 .../azure/eventhub/ConsumeAzureEventHub.java       | 44 ++++++-----
 .../azure/eventhub/GetAzureEventHub.java           | 46 +++++++----
 .../azure/eventhub/PutAzureEventHub.java           | 49 ++++++++----
 .../azure/eventhub/utils/AzureEventHubUtils.java   | 90 ++++++++++++++++++++++
 .../azure/eventhub/GetAzureEventHubTest.java       | 18 ++++-
 .../azure/eventhub/PutAzureEventHubTest.java       |  9 +++
 .../azure/eventhub/TestConsumeAzureEventHub.java   | 34 +++++++-
 8 files changed, 246 insertions(+), 54 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 5e909a5..b0e80df 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -20,8 +20,8 @@
     <artifactId>nifi-azure-processors</artifactId>
     <packaging>jar</packaging>
     <properties>
-        <azure-eventhubs.version>2.3.2</azure-eventhubs.version>
-        <azure-eventhubs-eph.version>2.5.2</azure-eventhubs-eph.version>
+        <azure-eventhubs.version>3.1.1</azure-eventhubs.version>
+        <azure-eventhubs-eph.version>3.1.1</azure-eventhubs-eph.version>
     </properties>
     <dependencies>
         <dependency>
@@ -89,6 +89,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.12.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-processor-utils</artifactId>
             <version>1.12.0-SNAPSHOT</version>
         </dependency>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index f1e3a64..7873e4c 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -23,7 +23,6 @@ import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
 import com.microsoft.azure.eventprocessorhost.IEventProcessor;
 import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory;
 import com.microsoft.azure.eventprocessorhost.PartitionContext;
-import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
 import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
@@ -76,6 +75,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.nifi.util.StringUtils.isEmpty;
+import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
 
 @Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
 @CapabilityDescription("Receives messages from Azure Event Hubs, writing the contents of the message to the content of the FlowFile.")
@@ -115,17 +115,13 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
             .description("The name of the shared access policy. This policy must have Listen claims.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .required(true)
+            .required(false)
             .build();
     static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AzureEventHubUtils.POLICY_PRIMARY_KEY)
             .name("event-hub-shared-access-policy-primary-key")
-            .displayName("Shared Access Policy Primary Key")
-            .description("The primary key of the shared access policy.")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .sensitive(true)
-            .required(true)
             .build();
+    static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
     static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder()
             .name("event-hub-consumer-group")
             .displayName("Consumer Group")
@@ -261,7 +257,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
 
     static {
         PROPERTIES = Collections.unmodifiableList(Arrays.asList(
-                NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
+                NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
                 RECORD_READER, RECORD_WRITER,
                 INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT,
                 STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME
@@ -335,6 +331,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
                     .valid(false)
                     .build());
         }
+        results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext));
         return results;
     }
 
@@ -347,9 +344,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
         }
     }
 
-    public class EventProcessorFactory implements IEventProcessorFactory {
+    public class EventProcessorFactory implements IEventProcessorFactory<EventProcessor> {
         @Override
-        public IEventProcessor createEventProcessor(PartitionContext context) throws Exception {
+        public EventProcessor createEventProcessor(PartitionContext context) throws Exception {
             final EventProcessor eventProcessor = new EventProcessor();
             return eventProcessor;
         }
@@ -581,12 +578,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
         final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
         validateRequiredProperty(EVENT_HUB_NAME, eventHubName);
 
-        final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
-        validateRequiredProperty(ACCESS_POLICY_NAME, sasName);
-
-        final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
-        validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
-
         final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
         validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
 
@@ -627,9 +618,22 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
 
         final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey);
 
-        final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder().setNamespaceName(namespaceName).setEventHubName( eventHubName).setSasKeyName(sasName).setSasKey(sasKey);
-
-        eventProcessorHost = new EventProcessorHost(consumerHostname, eventHubName, consumerGroupName, eventHubConnectionString.toString(), storageConnectionString, containerName);
+        final String connectionString;
+        final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+        if(useManagedIdentity) {
+            connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespaceName, eventHubName);
+        } else {
+            final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
+            validateRequiredProperty(ACCESS_POLICY_NAME, sasName);
+            final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
+            validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
+            connectionString = AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, eventHubName, sasName, sasKey);
+        }
+        eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder
+                                .newBuilder(consumerHostname, consumerGroupName)
+                                .useAzureStorageCheckpointLeaseManager(storageConnectionString, containerName, null)
+                                .useEventHubConnectionString(connectionString, eventHubName)
+                                .build();
 
         options.setExceptionNotification(e -> {
             getLogger().error("An error occurred while receiving messages from Azure Event Hub {}" +
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
index cb8c9ac..78a3327 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java
@@ -22,6 +22,7 @@ import java.net.URISyntaxException;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -54,6 +55,8 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -63,6 +66,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
 
 @Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
 @CapabilityDescription("Receives messages from Microsoft Azure Event Hubs, writing the contents of the Azure message to the content of the FlowFile. "
@@ -103,16 +107,10 @@ public class GetAzureEventHub extends AbstractProcessor {
             .description("The name of the shared access policy. This policy must have Listen claims.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .required(true)
-            .build();
-    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
-            .name("Shared Access Policy Primary Key")
-            .description("The primary key of the shared access policy")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .sensitive(true)
-            .required(true)
+            .required(false)
             .build();
+    static final PropertyDescriptor POLICY_PRIMARY_KEY =  AzureEventHubUtils.POLICY_PRIMARY_KEY;
+    static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
 
     static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
             .name("Number of Event Hub Partitions")
@@ -163,6 +161,7 @@ public class GetAzureEventHub extends AbstractProcessor {
             .description("Any FlowFile that is successfully received from the event hub will be transferred to this Relationship.")
             .build();
 
+
     private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();
     private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
     private volatile Instant configuredEnqueueTime;
@@ -184,6 +183,7 @@ public class GetAzureEventHub extends AbstractProcessor {
         _propertyDescriptors.add(NAMESPACE);
         _propertyDescriptors.add(ACCESS_POLICY);
         _propertyDescriptors.add(POLICY_PRIMARY_KEY);
+        _propertyDescriptors.add(USE_MANAGED_IDENTITY);
         _propertyDescriptors.add(NUM_PARTITIONS);
         _propertyDescriptors.add(CONSUMER_GROUP);
         _propertyDescriptors.add(ENQUEUE_TIME);
@@ -207,10 +207,16 @@ public class GetAzureEventHub extends AbstractProcessor {
         return propertyDescriptors;
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
+        return retVal;
+    }
+
     protected void setupReceiver(final String connectionString, final ScheduledExecutorService executor) throws ProcessException {
         try {
-            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
-            eventHubClient = EventHubClient.createSync(connectionString, executor);
+            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
+            eventHubClient = EventHubClient.createFromConnectionStringSync(connectionString, executor);
         } catch (IOException | EventHubException e) {
             throw new ProcessException(e);
         }
@@ -301,11 +307,23 @@ public class GetAzureEventHub extends AbstractProcessor {
         }
         this.partitionNames = partitionNames;
 
-        final String policyName = context.getProperty(ACCESS_POLICY).getValue();
-        final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
         final String namespace = context.getProperty(NAMESPACE).getValue();
         final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
         final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).getValue();
+        final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+        final String connectionString;
+
+        if(useManagedIdentity){
+            connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace,eventHubName);
+        } else {
+            final String policyName = context.getProperty(ACCESS_POLICY).getValue();
+            final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
+            connectionString = new ConnectionStringBuilder()
+                                    .setEndpoint(new URI("amqps://"+namespace+serviceBusEndpoint))
+                                    .setEventHubName(eventHubName)
+                                    .setSasKeyName(policyName)
+                                    .setSasKey(policyKey).toString();
+        }
 
         if(context.getProperty(ENQUEUE_TIME).isSet()) {
             configuredEnqueueTime = Instant.parse(context.getProperty(ENQUEUE_TIME).toString());
@@ -324,8 +342,6 @@ public class GetAzureEventHub extends AbstractProcessor {
         }
 
         executor = Executors.newScheduledThreadPool(4);
-        final String connectionString = new ConnectionStringBuilder().setEndpoint(
-            new URI("amqps://"+namespace+serviceBusEndpoint)).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
         setupReceiver(connectionString, executor);
     }
 
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index ec47384..51bb5bf 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -16,8 +16,10 @@
  */
 package org.apache.nifi.processors.azure.eventhub;
 
+import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -34,7 +36,6 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubClient;
 import com.microsoft.azure.eventhubs.EventHubException;
@@ -53,6 +54,8 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -91,16 +94,11 @@ public class PutAzureEventHub extends AbstractProcessor {
             .description("The name of the shared access policy. This policy must have Send claims.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .required(true)
-            .build();
-    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
-            .name("Shared Access Policy Primary Key")
-            .description("The primary key of the shared access policy")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .sensitive(true)
-            .required(true)
+            .required(false)
             .build();
+    static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
+    static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
+
     static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
             .name("partitioning-key-attribute-name")
             .displayName("Partitioning Key Attribute Name")
@@ -144,6 +142,7 @@ public class PutAzureEventHub extends AbstractProcessor {
         _propertyDescriptors.add(NAMESPACE);
         _propertyDescriptors.add(ACCESS_POLICY);
         _propertyDescriptors.add(POLICY_PRIMARY_KEY);
+        _propertyDescriptors.add(USE_MANAGED_IDENTITY);
         _propertyDescriptors.add(PARTITIONING_KEY_ATTRIBUTE_NAME);
         _propertyDescriptors.add(MAX_BATCH_SIZE);
         propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
@@ -178,6 +177,13 @@ public class PutAzureEventHub extends AbstractProcessor {
     }
 
     @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext context) {
+        List<ValidationResult> retVal = AzureEventHubUtils.customValidate(ACCESS_POLICY, POLICY_PRIMARY_KEY, context);
+        return retVal;
+    }
+
+
+    @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         try {
             populateSenderQueue(context);
@@ -314,8 +320,15 @@ public class PutAzureEventHub extends AbstractProcessor {
             final int numThreads = context.getMaxConcurrentTasks();
             senderQueue = new LinkedBlockingQueue<>(numThreads);
             executor = Executors.newScheduledThreadPool(4);
-            final String policyName = context.getProperty(ACCESS_POLICY).getValue();
-            final String policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
+            final boolean useManagedIdentiy = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+            final String policyName, policyKey;
+            if(useManagedIdentiy) {
+                policyName = AzureEventHubUtils.MANAGED_IDENTITY_POLICY;
+                policyKey =null;
+            } else {
+                policyName = context.getProperty(ACCESS_POLICY).getValue();
+                policyKey = context.getProperty(POLICY_PRIMARY_KEY).getValue();
+            }
             final String namespace = context.getProperty(NAMESPACE).getValue();
             final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
             for (int i = 0; i < numThreads; i++) {
@@ -345,8 +358,14 @@ public class PutAzureEventHub extends AbstractProcessor {
         throws ProcessException{
 
         try {
-            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/2.3.2";
-            return EventHubClient.createSync(getConnectionString(namespace, eventHubName, policyName, policyKey), executor);
+            EventHubClientImpl.USER_AGENT = "ApacheNiFi-azureeventhub/3.1.1";
+            final String connectionString;
+            if(policyName == AzureEventHubUtils.MANAGED_IDENTITY_POLICY) {
+                connectionString = AzureEventHubUtils.getManagedIdentityConnectionString(namespace, eventHubName);
+            } else{
+                connectionString = getConnectionString(namespace, eventHubName, policyName, policyKey);
+            }
+            return EventHubClient.createFromConnectionStringSync(connectionString, executor);
         } catch (IOException | EventHubException | IllegalConnectionStringFormatException e) {
             getLogger().error("Failed to create EventHubClient due to {}", new Object[]{e.getMessage()}, e);
             throw new ProcessException(e);
@@ -354,7 +373,7 @@ public class PutAzureEventHub extends AbstractProcessor {
     }
 
     protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){
-        return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
+        return AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespace, eventHubName, policyName, policyKey);
     }
 
     /**
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
new file mode 100644
index 0000000..d95e27f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/utils/AzureEventHubUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class AzureEventHubUtils {
+
+    public static final String MANAGED_IDENTITY_POLICY = ConnectionStringBuilder.MANAGED_IDENTITY_AUTHENTICATION;
+
+    public static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
+        .name("Shared Access Policy Primary Key")
+        .description("The primary key of the shared access policy")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .sensitive(true)
+        .required(false)
+        .build();
+
+    public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder()
+        .name("use-managed-identity")
+        .displayName("Use Azure Managed Identity")
+        .description("Choose whether or not to use the managed identity of Azure VM/VMSS")
+        .required(false).defaultValue("false").allowableValues("true", "false")
+        .addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
+
+    public static List<ValidationResult> customValidate(PropertyDescriptor accessPolicyDescriptor,
+        PropertyDescriptor policyKeyDescriptor,
+        ValidationContext context) {
+        List<ValidationResult> retVal = new ArrayList<>();
+
+        boolean accessPolicyIsSet  = context.getProperty(accessPolicyDescriptor).isSet();
+        boolean policyKeyIsSet     = context.getProperty(policyKeyDescriptor).isSet();
+        boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
+
+        if (useManagedIdentity && (accessPolicyIsSet || policyKeyIsSet) ) {
+            final String msg = String.format(
+                "('%s') and ('%s' with '%s') fields cannot be set at the same time.",
+                USE_MANAGED_IDENTITY.getDisplayName(),
+                accessPolicyDescriptor.getDisplayName(),
+                POLICY_PRIMARY_KEY.getDisplayName()
+            );
+            retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
+        } else if (!useManagedIdentity && (!accessPolicyIsSet || !policyKeyIsSet)) {
+            final String msg = String.format(
+                "either('%s') or (%s with '%s') must be set",
+                USE_MANAGED_IDENTITY.getDisplayName(),
+                accessPolicyDescriptor.getDisplayName(),
+                POLICY_PRIMARY_KEY.getDisplayName()
+            );
+            retVal.add(new ValidationResult.Builder().subject("Credentials config").valid(false).explanation(msg).build());
+        }
+        return retVal;
+    }
+
+    public static String getManagedIdentityConnectionString(final String namespace, final String eventHubName){
+        return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName)
+                    .setAuthentication(MANAGED_IDENTITY_POLICY).toString();
+    }
+    public static String getSharedAccessSignatureConnectionString(final String namespace, final String eventHubName, final String sasName, final String sasKey) {
+        return new ConnectionStringBuilder()
+                    .setNamespaceName(namespace)
+                    .setEventHubName(eventHubName)
+                    .setSasKeyName(sasName)
+                    .setSasKey(sasKey).toString();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
index 6e55f60..88bcfa8 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java
@@ -78,7 +78,23 @@ public class GetAzureEventHubTest {
         testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
         testRunner.assertValid();
     }
-
+    @Test
+    public void testProcessorConfigValidityWithManagedIdentityFlag() {
+        testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(PutAzureEventHub.USE_MANAGED_IDENTITY,"true");
+        testRunner.assertNotValid();
+        testRunner.setProperty(GetAzureEventHub.NUM_PARTITIONS,"4");
+        testRunner.assertValid();
+        testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z");
+        testRunner.assertValid();
+        testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_SIZE, "5");
+        testRunner.assertValid();
+        testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
+        testRunner.assertValid();
+    }
     @Test
     public void verifyRelationships(){
         assert(1 == processor.getRelationships().size());
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index 83ed649..183b80d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -89,6 +89,15 @@ public class PutAzureEventHubTest {
         testRunner.assertValid();
     }
     @Test
+    public void testProcessorConfigValidityWithManagedIdentityFlag() {
+        testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(PutAzureEventHub.USE_MANAGED_IDENTITY,"true");
+        testRunner.assertValid();
+    }
+    @Test
     public void verifyRelationships(){
         assert(2 == processor.getRelationships().size());
     }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index e7edef7..99541a5 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -23,6 +23,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
@@ -39,6 +40,8 @@ import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessSession;
 import org.apache.nifi.util.SharedSessionState;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -64,8 +67,15 @@ import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 
 public class TestConsumeAzureEventHub {
+    private static final String namespaceName = "nifi-azure-hub";
+    private static final String eventHubName = "get-test";
+    private static final String storageAccountName = "test-sa";
+    private static final String storageAccountKey = "test-sa-key";
+
     private ConsumeAzureEventHub.EventProcessor eventProcessor;
     private MockProcessSession processSession;
     private SharedSessionState sharedState;
@@ -97,7 +107,29 @@ public class TestConsumeAzureEventHub {
         when(partitionContext.getPartitionId()).thenReturn("partition-id");
         when(partitionContext.getConsumerGroupName()).thenReturn("consumer-group");
     }
-
+    @Test
+    public void testProcessorConfigValidityWithManagedIdentityFlag() throws InitializationException {
+        TestRunner testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
+        testRunner.assertNotValid();
+        final MockRecordParser reader = new MockRecordParser();
+        final MockRecordWriter writer = new MockRecordWriter();
+        testRunner.addControllerService("writer", writer);
+        testRunner.enableControllerService(writer);
+        testRunner.addControllerService("reader", reader);
+        testRunner.enableControllerService(reader);
+        testRunner.setProperty(ConsumeAzureEventHub.RECORD_WRITER, "writer");
+        testRunner.setProperty(ConsumeAzureEventHub.RECORD_READER, "reader");
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey);
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(ConsumeAzureEventHub.USE_MANAGED_IDENTITY,"true");
+        testRunner.assertValid();
+    }
     @Test
     public void testReceiveOne() throws Exception {
         final Iterable<EventData> eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));