You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/08/11 21:06:42 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6296: NIFI-10300: Added AzureEventHubRecordSink

exceptionfactory commented on code in PR #6296:
URL: https://github.com/apache/nifi/pull/6296#discussion_r943929811


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml:
##########
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-azure-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-azure-record-sink</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
+            <version>5.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>3.3.3</version>

Review Comment:
   This version number should be changed to reference the `${mockito.version}` property from the root Maven configuration.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml:
##########
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-azure-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-azure-record-sink</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
+            <version>5.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>

Review Comment:
   The `compile` scope is the default, so it can be removed from this dependency and other dependencies.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml:
##########
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-azure-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-azure-record-sink</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
+            <version>5.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>3.3.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>

Review Comment:
   These two dependencies need to have the `scope` set to `test` to avoid inclusion in the runtime NAR.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml:
##########
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-azure-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-azure-record-sink</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
+            <version>5.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>

Review Comment:
   This dependency can be removed.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml:
##########
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-azure-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-azure-record-sink</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
+            <version>5.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>3.3.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-identity</artifactId>
+            <version>${azure.identity.version}</version>
+        </dependency>

Review Comment:
   Recommend moving this dependency right after `azure-messaging-eventhubs`



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureAuthenticationStrategy.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum AzureAuthenticationStrategy implements DescribedValue {
+    SHARED_ACCESS_KEY("Shared Access Key", "Azure Event Hub shared access key"),
+    DEFAULT_AZURE_CREDENTIAL("Default Azure Credential", "Azure Event Hub token credential");

Review Comment:
   The description should include some additional details. Based on the [Default Azure Credential](https://docs.microsoft.com/en-us/azure/developer/java/sdk/identity-azure-hosted-auth#default-azure-credential) documentation, I recommend the following wording:
   ```suggestion
       DEFAULT_AZURE_CREDENTIAL("Default Azure Credential", "The Default Azure Credential will read credentials from standard environment variables and will also attempt to read Managed Identity credentials when running in Microsoft Azure environments");
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.identity.DefaultAzureCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"azure", "record", "sink"})
+@CapabilityDescription("Format and send Records to Azure Event Hubs")
+public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    public static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Servicebus endpoint for general use");
+    public static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus endpoint for China");
+    public static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint for Germany");
+    public static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "Servicebus endpoint for US Government");
+
+    public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
+            .name("Service Bus Endpoint")
+            .description("To support namespaces not in the default windows.net domain.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AZURE_ENDPOINT, AZURE_CHINA_ENDPOINT,
+                    AZURE_GERMANY_ENDPOINT, AZURE_US_GOV_ENDPOINT)
+            .defaultValue(AZURE_ENDPOINT.getValue())
+            .required(true)
+            .build();
+    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
+            .name("Event Hub Name")
+            .description("The name of the event hub to send to")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
+            .name("Event Hub Namespace")
+            .description("The namespace that the event hub is assigned to. This is generally equal to <Event Hubs Name>-ns")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+    static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
+            .name("authentication-strategy")
+            .displayName("Authentication Strategy")
+            .description("The strategy used to authenticate event hub: Default Azure Credentials or Shared Access Key")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AzureAuthenticationStrategy.class)
+            .required(true)
+            .defaultValue(AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+    static final PropertyDescriptor SHARED_ACCESS_POLICY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy")
+            .description("The name of the shared access policy. This policy must have Send claims.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
+            .build();
+    public static final PropertyDescriptor SHARED_ACCESS_POLICY_KEY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy Key")
+            .description("The primary key of the shared access policy")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .sensitive(true)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+    public static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
+            .name("Partition Key")
+            .description("A hint for Eventhub message broker how to distribute messages consistently amongst multiple partitions.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+            Arrays.asList(
+                    EVENT_HUB_NAME,
+                    NAMESPACE,
+                    SERVICE_BUS_ENDPOINT,
+                    SHARED_ACCESS_POLICY,
+                    SHARED_ACCESS_POLICY_KEY,
+                    AUTHENTICATION_STRATEGY,

Review Comment:
   The `AUTHENTICATION_STRATEGY` should be moved before the the `SHARED_ACCESS` properties:
   ```suggestion
                       AUTHENTICATION_STRATEGY,
                       SHARED_ACCESS_POLICY,
                       SHARED_ACCESS_POLICY_KEY,
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.identity.DefaultAzureCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"azure", "record", "sink"})
+@CapabilityDescription("Format and send Records to Azure Event Hubs")
+public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    public static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Servicebus endpoint for general use");
+    public static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus endpoint for China");
+    public static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint for Germany");
+    public static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "Servicebus endpoint for US Government");
+
+    public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
+            .name("Service Bus Endpoint")
+            .description("To support namespaces not in the default windows.net domain.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AZURE_ENDPOINT, AZURE_CHINA_ENDPOINT,
+                    AZURE_GERMANY_ENDPOINT, AZURE_US_GOV_ENDPOINT)
+            .defaultValue(AZURE_ENDPOINT.getValue())
+            .required(true)
+            .build();
+    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
+            .name("Event Hub Name")
+            .description("The name of the event hub to send to")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
+            .name("Event Hub Namespace")
+            .description("The namespace that the event hub is assigned to. This is generally equal to <Event Hubs Name>-ns")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+    static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
+            .name("authentication-strategy")
+            .displayName("Authentication Strategy")
+            .description("The strategy used to authenticate event hub: Default Azure Credentials or Shared Access Key")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AzureAuthenticationStrategy.class)
+            .required(true)
+            .defaultValue(AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+    static final PropertyDescriptor SHARED_ACCESS_POLICY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy")
+            .description("The name of the shared access policy. This policy must have Send claims.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
+            .build();
+    public static final PropertyDescriptor SHARED_ACCESS_POLICY_KEY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy Key")
+            .description("The primary key of the shared access policy")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .sensitive(true)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+    public static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
+            .name("Partition Key")
+            .description("A hint for Eventhub message broker how to distribute messages consistently amongst multiple partitions.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+            Arrays.asList(
+                    EVENT_HUB_NAME,
+                    NAMESPACE,
+                    SERVICE_BUS_ENDPOINT,

Review Comment:
   The `SERVICE_BUS_ENDPOINT` should be moved before the Name and Namespace properties:
   ```suggestion
                       SERVICE_BUS_ENDPOINT,
                       EVENT_HUB_NAME,
                       NAMESPACE,
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestAzureEventHubRecordSink {
+    private static final String namespaceName = "nifi-azure-hub";
+    private static final String eventHubName = "get-test";
+    private static final String sasKey = "policyKey";
+    private AzureEventHubRecordSink azureEventHubRecordSink;
+    private static final String NULL_HEADER = null;
+    private static final String WRITER_IDENTIFIER = MockRecordWriter.class.getSimpleName();
+    private static final String IDENTIFIER = AzureEventHubRecordSink.class.getSimpleName();
+    private static final String ID_FIELD = "id";
+    private static final String ID_FIELD_VALUE = TestAzureEventHubRecordSink.class.getSimpleName();
+    private static final RecordSchema RECORD_SCHEMA = getRecordSchema();
+    private static final boolean SEND_ZERO_RESULTS = true;
+    @Mock
+    private EventHubProducerClient client;
+    @Mock
+    private EventDataBatch eventDataBatch;
+
+
+    @BeforeEach
+    void setRunner() throws Exception {
+        final org.apache.nifi.util.TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);

Review Comment:
   The fully-qualified class should be replaced with an import and scoped class name:
   ```suggestion
           final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.identity.DefaultAzureCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"azure", "record", "sink"})
+@CapabilityDescription("Format and send Records to Azure Event Hubs")
+public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    public static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Servicebus endpoint for general use");
+    public static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus endpoint for China");
+    public static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint for Germany");
+    public static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "Servicebus endpoint for US Government");
+
+    public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
+            .name("Service Bus Endpoint")
+            .description("To support namespaces not in the default windows.net domain.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AZURE_ENDPOINT, AZURE_CHINA_ENDPOINT,
+                    AZURE_GERMANY_ENDPOINT, AZURE_US_GOV_ENDPOINT)
+            .defaultValue(AZURE_ENDPOINT.getValue())
+            .required(true)
+            .build();
+    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
+            .name("Event Hub Name")
+            .description("The name of the event hub to send to")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
+            .name("Event Hub Namespace")
+            .description("The namespace that the event hub is assigned to. This is generally equal to <Event Hubs Name>-ns")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+    static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
+            .name("authentication-strategy")
+            .displayName("Authentication Strategy")
+            .description("The strategy used to authenticate event hub: Default Azure Credentials or Shared Access Key")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AzureAuthenticationStrategy.class)
+            .required(true)
+            .defaultValue(AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+    static final PropertyDescriptor SHARED_ACCESS_POLICY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy")
+            .description("The name of the shared access policy. This policy must have Send claims.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
+            .build();
+    public static final PropertyDescriptor SHARED_ACCESS_POLICY_KEY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy Key")
+            .description("The primary key of the shared access policy")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .sensitive(true)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+    public static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
+            .name("Partition Key")
+            .description("A hint for Eventhub message broker how to distribute messages consistently amongst multiple partitions.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+            Arrays.asList(
+                    EVENT_HUB_NAME,
+                    NAMESPACE,
+                    SERVICE_BUS_ENDPOINT,
+                    SHARED_ACCESS_POLICY,
+                    SHARED_ACCESS_POLICY_KEY,
+                    AUTHENTICATION_STRATEGY,
+                    PARTITION_KEY,
+                    RECORD_WRITER_FACTORY
+            )
+    );
+
+    private volatile ConfigurationContext context;
+    private RecordSetWriterFactory writerFactory;
+    private EventHubProducerClient client;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    protected EventHubProducerClient createEventHubClient(final String namespace, final String serviceBusEndpoint, final String eventHubName,
+                                                          final String policyName, final String policyKey, final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException {
+        final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
+        final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
+        final DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder();
+        final DefaultAzureCredential defaultAzureCredential = defaultAzureCredentialBuilder.build();
+        final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
+        if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == authenticationStrategy) {
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
+        } else {
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, defaultAzureCredential);
+        }
+        return eventHubClientBuilder.buildProducerClient();
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.context = context;
+        writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        final String namespace = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
+        final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).evaluateAttributeExpressions().getValue();
+        final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
+        final String policyName = context.getProperty(SHARED_ACCESS_POLICY).getValue();
+        final String policyKey = context.getProperty(SHARED_ACCESS_POLICY_KEY).getValue();
+        final String authenticationStrategy = context.getProperty(AUTHENTICATION_STRATEGY).getValue();
+        final AzureAuthenticationStrategy azureAuthenticationStrategy = AzureAuthenticationStrategy.valueOf(authenticationStrategy);
+        client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, azureAuthenticationStrategy);
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        if (client == null) {
+            getLogger().debug("Event Sender not configured");

Review Comment:
   The wording should be adjusted:
   ```suggestion
               getLogger().debug("Event Hub Client not configured");
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestAzureEventHubRecordSink {
+    private static final String namespaceName = "nifi-azure-hub";
+    private static final String eventHubName = "get-test";
+    private static final String sasKey = "policyKey";
+    private AzureEventHubRecordSink azureEventHubRecordSink;
+    private static final String NULL_HEADER = null;
+    private static final String WRITER_IDENTIFIER = MockRecordWriter.class.getSimpleName();
+    private static final String IDENTIFIER = AzureEventHubRecordSink.class.getSimpleName();
+    private static final String ID_FIELD = "id";
+    private static final String ID_FIELD_VALUE = TestAzureEventHubRecordSink.class.getSimpleName();
+    private static final RecordSchema RECORD_SCHEMA = getRecordSchema();
+    private static final boolean SEND_ZERO_RESULTS = true;
+    @Mock
+    private EventHubProducerClient client;
+    @Mock
+    private EventDataBatch eventDataBatch;
+
+
+    @BeforeEach
+    void setRunner() throws Exception {
+        final org.apache.nifi.util.TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.setValidateExpressionUsage(false);
+
+        final MockRecordWriter recordWriter = new MockRecordWriter(NULL_HEADER, false);
+        runner.addControllerService(WRITER_IDENTIFIER, recordWriter);
+        runner.enableControllerService(recordWriter);
+
+        azureEventHubRecordSink = new MockAzureEventHubRecordSink();
+        runner.addControllerService(IDENTIFIER, azureEventHubRecordSink);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAME, eventHubName);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.NAMESPACE, namespaceName);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.SHARED_ACCESS_POLICY_KEY, sasKey);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER);
+        runner.enableControllerService(azureEventHubRecordSink);
+    }
+    public class MockAzureEventHubRecordSink extends AzureEventHubRecordSink {
+
+        @Override
+        protected EventHubProducerClient createEventHubClient(
+                final String namespace,
+                final String serviceBusEndpoint,
+                final String eventHubName,
+                final String policyName,
+                final String policyKey,
+                final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException {
+            return client;
+        }
+    }
+
+    private static RecordSchema getRecordSchema() {
+        final RecordField idField = new RecordField(ID_FIELD, RecordFieldType.STRING.getDataType());
+        return new SimpleRecordSchema(Collections.singletonList(idField));
+    }
+
+    private static Record[] getRecords(int numberOfRecords) {
+        final Map<String, Object> values = Collections.singletonMap(ID_FIELD, ID_FIELD_VALUE);
+        final Record record = new MapRecord(RECORD_SCHEMA, values);
+        final Record[] records = new Record[numberOfRecords];
+        Arrays.fill(records, record);
+        return records;
+    }

Review Comment:
   Recommend moving the mock class and static methods to the end of the test class, after the test methods.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestAzureEventHubRecordSink {
+    private static final String namespaceName = "nifi-azure-hub";
+    private static final String eventHubName = "get-test";
+    private static final String sasKey = "policyKey";
+    private AzureEventHubRecordSink azureEventHubRecordSink;
+    private static final String NULL_HEADER = null;
+    private static final String WRITER_IDENTIFIER = MockRecordWriter.class.getSimpleName();
+    private static final String IDENTIFIER = AzureEventHubRecordSink.class.getSimpleName();
+    private static final String ID_FIELD = "id";
+    private static final String ID_FIELD_VALUE = TestAzureEventHubRecordSink.class.getSimpleName();
+    private static final RecordSchema RECORD_SCHEMA = getRecordSchema();
+    private static final boolean SEND_ZERO_RESULTS = true;
+    @Mock
+    private EventHubProducerClient client;
+    @Mock
+    private EventDataBatch eventDataBatch;
+
+
+    @BeforeEach
+    void setRunner() throws Exception {
+        final org.apache.nifi.util.TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.setValidateExpressionUsage(false);
+
+        final MockRecordWriter recordWriter = new MockRecordWriter(NULL_HEADER, false);
+        runner.addControllerService(WRITER_IDENTIFIER, recordWriter);
+        runner.enableControllerService(recordWriter);
+
+        azureEventHubRecordSink = new MockAzureEventHubRecordSink();
+        runner.addControllerService(IDENTIFIER, azureEventHubRecordSink);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAME, eventHubName);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.NAMESPACE, namespaceName);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.SHARED_ACCESS_POLICY_KEY, sasKey);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER);
+        runner.enableControllerService(azureEventHubRecordSink);
+    }
+    public class MockAzureEventHubRecordSink extends AzureEventHubRecordSink {
+
+        @Override
+        protected EventHubProducerClient createEventHubClient(
+                final String namespace,
+                final String serviceBusEndpoint,
+                final String eventHubName,
+                final String policyName,
+                final String policyKey,
+                final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException {
+            return client;
+        }
+    }
+
+    private static RecordSchema getRecordSchema() {
+        final RecordField idField = new RecordField(ID_FIELD, RecordFieldType.STRING.getDataType());
+        return new SimpleRecordSchema(Collections.singletonList(idField));
+    }
+
+    private static Record[] getRecords(int numberOfRecords) {
+        final Map<String, Object> values = Collections.singletonMap(ID_FIELD, ID_FIELD_VALUE);
+        final Record record = new MapRecord(RECORD_SCHEMA, values);
+        final Record[] records = new Record[numberOfRecords];
+        Arrays.fill(records, record);
+        return records;
+    }
+
+    @Test
+    void testSendDataNoRecords() throws IOException {
+        when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
+
+        final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA);
+        final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+        Assertions.assertNotNull(writeResult);
+        Assertions.assertEquals(0, writeResult.getRecordCount());
+
+        Mockito.verify(client, Mockito.times(0)).send(Mockito.any(EventDataBatch.class));

Review Comment:
   Recommend replacing Assertions and Mockito class with static imports for improved readability:
   ```suggestion
           assertNotNull(writeResult);
           assertEquals(0, writeResult.getRecordCount());
   
           verify(client, times(0)).send(any(EventDataBatch.class));
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.identity.DefaultAzureCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"azure", "record", "sink"})
+@CapabilityDescription("Format and send Records to Azure Event Hubs")
+public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    public static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Servicebus endpoint for general use");
+    public static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus endpoint for China");
+    public static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint for Germany");
+    public static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "Servicebus endpoint for US Government");
+
+    public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
+            .name("Service Bus Endpoint")
+            .description("To support namespaces not in the default windows.net domain.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AZURE_ENDPOINT, AZURE_CHINA_ENDPOINT,
+                    AZURE_GERMANY_ENDPOINT, AZURE_US_GOV_ENDPOINT)
+            .defaultValue(AZURE_ENDPOINT.getValue())
+            .required(true)
+            .build();
+    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
+            .name("Event Hub Name")
+            .description("The name of the event hub to send to")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
+            .name("Event Hub Namespace")
+            .description("The namespace that the event hub is assigned to. This is generally equal to <Event Hubs Name>-ns")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+    static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
+            .name("authentication-strategy")
+            .displayName("Authentication Strategy")
+            .description("The strategy used to authenticate event hub: Default Azure Credentials or Shared Access Key")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AzureAuthenticationStrategy.class)
+            .required(true)
+            .defaultValue(AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+    static final PropertyDescriptor SHARED_ACCESS_POLICY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy")
+            .description("The name of the shared access policy. This policy must have Send claims.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
+            .build();
+    public static final PropertyDescriptor SHARED_ACCESS_POLICY_KEY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy Key")
+            .description("The primary key of the shared access policy")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .sensitive(true)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+    public static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
+            .name("Partition Key")
+            .description("A hint for Eventhub message broker how to distribute messages consistently amongst multiple partitions.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+            Arrays.asList(
+                    EVENT_HUB_NAME,
+                    NAMESPACE,
+                    SERVICE_BUS_ENDPOINT,
+                    SHARED_ACCESS_POLICY,
+                    SHARED_ACCESS_POLICY_KEY,
+                    AUTHENTICATION_STRATEGY,
+                    PARTITION_KEY,
+                    RECORD_WRITER_FACTORY
+            )
+    );
+
+    private volatile ConfigurationContext context;
+    private RecordSetWriterFactory writerFactory;
+    private EventHubProducerClient client;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    protected EventHubProducerClient createEventHubClient(final String namespace, final String serviceBusEndpoint, final String eventHubName,
+                                                          final String policyName, final String policyKey, final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException {
+        final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
+        final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
+        final DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder();
+        final DefaultAzureCredential defaultAzureCredential = defaultAzureCredentialBuilder.build();
+        final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
+        if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == authenticationStrategy) {
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
+        } else {
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, defaultAzureCredential);
+        }
+        return eventHubClientBuilder.buildProducerClient();
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.context = context;
+        writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        final String namespace = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
+        final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).evaluateAttributeExpressions().getValue();
+        final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
+        final String policyName = context.getProperty(SHARED_ACCESS_POLICY).getValue();
+        final String policyKey = context.getProperty(SHARED_ACCESS_POLICY_KEY).getValue();
+        final String authenticationStrategy = context.getProperty(AUTHENTICATION_STRATEGY).getValue();
+        final AzureAuthenticationStrategy azureAuthenticationStrategy = AzureAuthenticationStrategy.valueOf(authenticationStrategy);
+        client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, azureAuthenticationStrategy);
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        if (client == null) {
+            getLogger().debug("Event Sender not configured");
+        } else {
+            client.close();
+        }
+    }
+
+    // put all records in one event, then see capacity of event

Review Comment:
   This should be changed to a JavaDoc comment, or removed.
   ```suggestion
       /**
        * Send each Record as an Event Data object
        */
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml:
##########
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-azure-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-azure-record-sink</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
+            <version>5.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-properties</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>3.3.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-identity</artifactId>
+            <version>${azure.identity.version}</version>
+        </dependency>
+    </dependencies>
+
+

Review Comment:
   These empty lines can be removed



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.identity.DefaultAzureCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"azure", "record", "sink"})
+@CapabilityDescription("Format and send Records to Azure Event Hubs")
+public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    public static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Servicebus endpoint for general use");
+    public static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "Servicebus endpoint for China");
+    public static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Servicebus endpoint for Germany");
+    public static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "Servicebus endpoint for US Government");
+
+    public static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
+            .name("Service Bus Endpoint")
+            .description("To support namespaces not in the default windows.net domain.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AZURE_ENDPOINT, AZURE_CHINA_ENDPOINT,
+                    AZURE_GERMANY_ENDPOINT, AZURE_US_GOV_ENDPOINT)
+            .defaultValue(AZURE_ENDPOINT.getValue())
+            .required(true)
+            .build();
+    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
+            .name("Event Hub Name")
+            .description("The name of the event hub to send to")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
+            .name("Event Hub Namespace")
+            .description("The namespace that the event hub is assigned to. This is generally equal to <Event Hubs Name>-ns")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(true)
+            .build();
+    static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
+            .name("authentication-strategy")
+            .displayName("Authentication Strategy")
+            .description("The strategy used to authenticate event hub: Default Azure Credentials or Shared Access Key")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(AzureAuthenticationStrategy.class)
+            .required(true)
+            .defaultValue(AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+    static final PropertyDescriptor SHARED_ACCESS_POLICY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy")
+            .description("The name of the shared access policy. This policy must have Send claims.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
+            .build();
+    public static final PropertyDescriptor SHARED_ACCESS_POLICY_KEY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy Key")
+            .description("The primary key of the shared access policy")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .sensitive(true)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+    public static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
+            .name("Partition Key")
+            .description("A hint for Eventhub message broker how to distribute messages consistently amongst multiple partitions.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+            Arrays.asList(
+                    EVENT_HUB_NAME,
+                    NAMESPACE,
+                    SERVICE_BUS_ENDPOINT,
+                    SHARED_ACCESS_POLICY,
+                    SHARED_ACCESS_POLICY_KEY,
+                    AUTHENTICATION_STRATEGY,
+                    PARTITION_KEY,
+                    RECORD_WRITER_FACTORY
+            )
+    );
+
+    private volatile ConfigurationContext context;
+    private RecordSetWriterFactory writerFactory;
+    private EventHubProducerClient client;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    protected EventHubProducerClient createEventHubClient(final String namespace, final String serviceBusEndpoint, final String eventHubName,
+                                                          final String policyName, final String policyKey, final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException {
+        final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
+        final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
+        final DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder();
+        final DefaultAzureCredential defaultAzureCredential = defaultAzureCredentialBuilder.build();
+        final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
+        if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == authenticationStrategy) {
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
+        } else {
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, defaultAzureCredential);
+        }

Review Comment:
   The credential objects should be built inside the conditionals.
   ```suggestion
           final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
           if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == authenticationStrategy) {
               final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
               eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
           } else {
               final DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder();
               final DefaultAzureCredential defaultAzureCredential = defaultAzureCredentialBuilder.build();
               eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, defaultAzureCredential);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org