You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/08/15 21:19:19 UTC

[nifi] branch main updated: NIFI-10300 Added AzureEventHubRecordSink

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new eb419b6e40 NIFI-10300 Added AzureEventHubRecordSink
eb419b6e40 is described below

commit eb419b6e408c29c3454c6952c0d7eec31ab8ee74
Author: royalfork2 <na...@gmail.com>
AuthorDate: Thu Aug 11 09:40:17 2022 -0700

    NIFI-10300 Added AzureEventHubRecordSink
    
    This closes #6296
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 nifi-assembly/pom.xml                              |   6 +
 .../nifi-azure-record-sink-nar/pom.xml             |  45 ++++
 .../nifi-azure-record-sink/pom.xml                 |  73 +++++++
 .../eventhub/AzureAuthenticationStrategy.java      |  49 +++++
 .../azure/eventhub/AzureEventHubRecordSink.java    | 242 +++++++++++++++++++++
 .../org.apache.nifi.controller.ControllerService   |  15 ++
 .../eventhub/TestAzureEventHubRecordSink.java      | 169 ++++++++++++++
 nifi-nar-bundles/nifi-azure-bundle/pom.xml         |   2 +
 8 files changed, 601 insertions(+)

diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index c50e2ca7fe..e991bae734 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -514,6 +514,12 @@ language governing permissions and limitations under the License. -->
             <version>1.18.0-SNAPSHOT</version>
             <type>nar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-azure-record-sink-nar</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-azure-services-api-nar</artifactId>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink-nar/pom.xml
new file mode 100644
index 0000000000..912ce0a1c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink-nar/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-azure-bundle</artifactId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-azure-record-sink-nar</artifactId>
+    <packaging>nar</packaging>
+    <properties>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <source.skip>true</source.skip>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-azure-record-sink</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+            <type>nar</type>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml
new file mode 100644
index 0000000000..a8d6073937
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+    license agreements. See the NOTICE file distributed with this work for additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>nifi-azure-bundle</artifactId>
+        <groupId>org.apache.nifi</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>nifi-azure-record-sink</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-messaging-eventhubs</artifactId>
+            <version>5.12.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.azure</groupId>
+            <artifactId>azure-identity</artifactId>
+            <version>${azure.identity.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-sink-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureAuthenticationStrategy.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureAuthenticationStrategy.java
new file mode 100644
index 0000000000..65d40538ae
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureAuthenticationStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum AzureAuthenticationStrategy implements DescribedValue {
+    SHARED_ACCESS_KEY("Shared Access Key", "Azure Event Hub shared access key"),
+    DEFAULT_AZURE_CREDENTIAL("Default Azure Credential", "The Default Azure Credential " +
+            "will read credentials from standard environment variables and will also attempt to read " +
+            "Managed Identity credentials when running in Microsoft Azure environments");
+
+    private final String displayName;
+    private final String description;
+
+    AzureAuthenticationStrategy(String displayName, String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java
new file mode 100644
index 0000000000..53f09d5061
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.identity.DefaultAzureCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"azure", "record", "sink"})
+@CapabilityDescription("Format and send Records to Azure Event Hubs")
+public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService {
+
+    static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Default Service Bus Endpoint");
+
+    static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "China Service Bus Endpoint");
+
+    static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Germany Service Bus Endpoint");
+
+    static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "United States Government Endpoint");
+
+    static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
+            .name("Service Bus Endpoint")
+            .description("Provides the domain for connecting to Azure Event Hubs")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues(
+                    AZURE_ENDPOINT,
+                    AZURE_CHINA_ENDPOINT,
+                    AZURE_GERMANY_ENDPOINT,
+                    AZURE_US_GOV_ENDPOINT
+            )
+            .defaultValue(AZURE_ENDPOINT.getValue())
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor EVENT_HUB_NAMESPACE = new PropertyDescriptor.Builder()
+            .name("Event Hub Namespace")
+            .description("Provides provides the host for connecting to Azure Event Hubs")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
+            .name("Event Hub Name")
+            .description("Provides the Event Hub Name for connections")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
+            .name("Authentication Strategy")
+            .description("Strategy for authenticating to Azure Event Hubs")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .allowableValues(AzureAuthenticationStrategy.class)
+            .required(true)
+            .defaultValue(AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+            .build();
+
+    static final PropertyDescriptor SHARED_ACCESS_POLICY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy")
+            .description("The name of the shared access policy. This policy must have Send claims")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
+            .build();
+
+    static final PropertyDescriptor SHARED_ACCESS_POLICY_KEY = new PropertyDescriptor.Builder()
+            .name("Shared Access Policy Key")
+            .description("The primary or secondary key of the shared access policy")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .required(false)
+            .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
+            .build();
+
+    static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
+            .name("Partition Key")
+            .description("A hint for Azure Event Hub message broker how to distribute messages across one or more partitions")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(false)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+            Arrays.asList(
+                    SERVICE_BUS_ENDPOINT,
+                    EVENT_HUB_NAMESPACE,
+                    EVENT_HUB_NAME,
+                    RECORD_WRITER_FACTORY,
+                    AUTHENTICATION_STRATEGY,
+                    SHARED_ACCESS_POLICY,
+                    SHARED_ACCESS_POLICY_KEY,
+                    PARTITION_KEY
+            )
+    );
+
+    private volatile ConfigurationContext context;
+    private RecordSetWriterFactory writerFactory;
+    private EventHubProducerClient client;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    protected EventHubProducerClient createEventHubClient(final String namespace,
+                                                          final String serviceBusEndpoint,
+                                                          final String eventHubName,
+                                                          final String policyName,
+                                                          final String policyKey,
+                                                          final AzureAuthenticationStrategy authenticationStrategy
+    ) {
+        final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
+        final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
+        if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == authenticationStrategy) {
+            final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
+        } else {
+            final DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder();
+            final DefaultAzureCredential defaultAzureCredential = defaultAzureCredentialBuilder.build();
+            eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, defaultAzureCredential);
+        }
+        return eventHubClientBuilder.buildProducerClient();
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        this.context = context;
+        writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+        final String namespace = context.getProperty(EVENT_HUB_NAMESPACE).evaluateAttributeExpressions().getValue();
+        final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).evaluateAttributeExpressions().getValue();
+        final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
+        final String policyName = context.getProperty(SHARED_ACCESS_POLICY).getValue();
+        final String policyKey = context.getProperty(SHARED_ACCESS_POLICY_KEY).getValue();
+        final String authenticationStrategy = context.getProperty(AUTHENTICATION_STRATEGY).getValue();
+        final AzureAuthenticationStrategy azureAuthenticationStrategy = AzureAuthenticationStrategy.valueOf(authenticationStrategy);
+        client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, azureAuthenticationStrategy);
+    }
+
+    @OnDisabled
+    public void onDisabled() {
+        if (client == null) {
+            getLogger().debug("Event Hub Client not configured");
+        } else {
+            client.close();
+        }
+    }
+
+    @Override
+    public WriteResult sendData(final RecordSet recordSet, Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
+        final Map<String, String> writeAttributes = new LinkedHashMap<>(attributes);
+        final String partitionKey = context.getProperty(PARTITION_KEY).evaluateAttributeExpressions(attributes).getValue();
+        final CreateBatchOptions createBatchOptions = new CreateBatchOptions();
+        createBatchOptions.setPartitionKey(partitionKey);
+        EventDataBatch eventDataBatch = client.createBatch(createBatchOptions);
+        final String correlationId = writeAttributes.get(CoreAttributes.UUID.key());
+        int recordCount = 0;
+        try (
+                final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+                final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), outputStream, attributes)
+        ) {
+            Record record;
+            final String contentType = writer.getMimeType();
+            while ((record = recordSet.next()) != null) {
+                writer.write(record);
+                writer.flush();
+                recordCount++;
+
+                final byte[] bytes = outputStream.toByteArray();
+                outputStream.reset();
+                final EventData eventData = new EventData(bytes);
+                eventData.getProperties().putAll(writeAttributes);
+                eventData.setContentType(contentType);
+                eventData.setCorrelationId(correlationId);
+                eventData.setMessageId(String.format("%s-%d", correlationId, recordCount));
+                if (!eventDataBatch.tryAdd(eventData)) {
+                    if (eventDataBatch.getCount() > 0) {
+                        client.send(eventDataBatch);
+                        eventDataBatch = client.createBatch(createBatchOptions);
+                    }
+                    if (!eventDataBatch.tryAdd(eventData)) {
+                        throw new ProcessException("Record " + recordCount + " exceeds maximum event data size: " + eventDataBatch.getMaxSizeInBytes());
+                    }
+                }
+            }
+            if (eventDataBatch.getCount() > 0) {
+                client.send(eventDataBatch);
+            }
+        } catch (final SchemaNotFoundException e) {
+            throw new IOException("Record Schema not found", e);
+        }
+        return WriteResult.of(recordCount, writeAttributes);
+    }
+}
+
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000000..2684da536e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.azure.eventhub.AzureEventHubRecordSink
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java
new file mode 100644
index 0000000000..8ab5595a6b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+
+@ExtendWith(MockitoExtension.class)
+public class TestAzureEventHubRecordSink {
+    private static final String EVENT_HUB_NAMESPACE = "namespace";
+    private static final String EVENT_HUB_NAME = "hub";
+    private static final String POLICY_KEY = "policyKey";
+    private static final String NULL_HEADER = null;
+    private static final String WRITER_IDENTIFIER = MockRecordWriter.class.getSimpleName();
+    private static final String IDENTIFIER = AzureEventHubRecordSink.class.getSimpleName();
+    private static final String ID_FIELD = "id";
+    private static final String ID_FIELD_VALUE = TestAzureEventHubRecordSink.class.getSimpleName();
+    private static final RecordSchema RECORD_SCHEMA = getRecordSchema();
+    private static final boolean SEND_ZERO_RESULTS = true;
+
+    @Mock
+    private EventHubProducerClient client;
+
+    @Mock
+    private EventDataBatch eventDataBatch;
+
+    private AzureEventHubRecordSink azureEventHubRecordSink;
+
+    @BeforeEach
+    void setRunner() throws Exception {
+        final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
+        runner.setValidateExpressionUsage(false);
+
+        final MockRecordWriter recordWriter = new MockRecordWriter(NULL_HEADER, false);
+        runner.addControllerService(WRITER_IDENTIFIER, recordWriter);
+        runner.enableControllerService(recordWriter);
+
+        azureEventHubRecordSink = new MockAzureEventHubRecordSink();
+        runner.addControllerService(IDENTIFIER, azureEventHubRecordSink);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAME, EVENT_HUB_NAME);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAMESPACE, EVENT_HUB_NAMESPACE);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.SHARED_ACCESS_POLICY_KEY, POLICY_KEY);
+        runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER);
+        runner.enableControllerService(azureEventHubRecordSink);
+    }
+
+    @Test
+    void testSendDataNoRecords() throws IOException {
+        when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
+
+        final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA);
+        final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+        assertNotNull(writeResult);
+        assertEquals(0, writeResult.getRecordCount());
+
+        verify(client, times(0)).send(any(EventDataBatch.class));
+    }
+
+    @Test
+    void testSendDataOneRecordException() {
+        when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
+        when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(false);
+
+        final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(1));
+        assertThrows(ProcessException.class, ()-> azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS));
+
+        verify(client, never()).send(any(EventDataBatch.class));
+    }
+
+    @Test
+    void testSendDataOneRecord() throws IOException {
+        when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
+        when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(true);
+
+        final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(1));
+        final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+        assertNotNull(writeResult);
+        assertEquals(1, writeResult.getRecordCount());
+    }
+
+    @Test
+    void testSendDataTwoRecords() throws IOException {
+        when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
+        when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(true);
+
+        final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(2));
+        final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+        assertNotNull(writeResult);
+        assertEquals(2, writeResult.getRecordCount());
+    }
+
+    public class MockAzureEventHubRecordSink extends AzureEventHubRecordSink {
+        @Override
+        protected EventHubProducerClient createEventHubClient(
+                final String namespace,
+                final String serviceBusEndpoint,
+                final String eventHubName,
+                final String policyName,
+                final String policyKey,
+                final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException {
+            return client;
+        }
+    }
+
+    private static RecordSchema getRecordSchema() {
+        final RecordField idField = new RecordField(ID_FIELD, RecordFieldType.STRING.getDataType());
+        return new SimpleRecordSchema(Collections.singletonList(idField));
+    }
+
+    private static Record[] getRecords(int numberOfRecords) {
+        final Map<String, Object> values = Collections.singletonMap(ID_FIELD, ID_FIELD_VALUE);
+        final Record record = new MapRecord(RECORD_SCHEMA, values);
+        final Record[] records = new Record[numberOfRecords];
+        Arrays.fill(records, record);
+        return records;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index a00ad4db2e..f617b0a3cd 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -41,6 +41,8 @@
         <module>nifi-azure-nar</module>
         <module>nifi-azure-services-api</module>
         <module>nifi-azure-services-api-nar</module>
+        <module>nifi-azure-record-sink</module>
+        <module>nifi-azure-record-sink-nar</module>
     </modules>
 
     <dependencyManagement>