You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/08/15 21:19:19 UTC
[nifi] branch main updated: NIFI-10300 Added AzureEventHubRecordSink
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new eb419b6e40 NIFI-10300 Added AzureEventHubRecordSink
eb419b6e40 is described below
commit eb419b6e408c29c3454c6952c0d7eec31ab8ee74
Author: royalfork2 <na...@gmail.com>
AuthorDate: Thu Aug 11 09:40:17 2022 -0700
NIFI-10300 Added AzureEventHubRecordSink
This closes #6296
Signed-off-by: David Handermann <ex...@apache.org>
---
nifi-assembly/pom.xml | 6 +
.../nifi-azure-record-sink-nar/pom.xml | 45 ++++
.../nifi-azure-record-sink/pom.xml | 73 +++++++
.../eventhub/AzureAuthenticationStrategy.java | 49 +++++
.../azure/eventhub/AzureEventHubRecordSink.java | 242 +++++++++++++++++++++
.../org.apache.nifi.controller.ControllerService | 15 ++
.../eventhub/TestAzureEventHubRecordSink.java | 169 ++++++++++++++
nifi-nar-bundles/nifi-azure-bundle/pom.xml | 2 +
8 files changed, 601 insertions(+)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index c50e2ca7fe..e991bae734 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -514,6 +514,12 @@ language governing permissions and limitations under the License. -->
<version>1.18.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-azure-record-sink-nar</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-services-api-nar</artifactId>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink-nar/pom.xml
new file mode 100644
index 0000000000..912ce0a1c6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink-nar/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-azure-bundle</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>nifi-azure-record-sink-nar</artifactId>
+ <packaging>nar</packaging>
+ <properties>
+ <maven.javadoc.skip>true</maven.javadoc.skip>
+ <source.skip>true</source.skip>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-azure-record-sink</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-standard-services-api-nar</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml
new file mode 100644
index 0000000000..a8d6073937
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>nifi-azure-bundle</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <version>1.18.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>nifi-azure-record-sink</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-messaging-eventhubs</artifactId>
+ <version>5.12.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.azure</groupId>
+ <artifactId>azure-identity</artifactId>
+ <version>${azure.identity.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-sink-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-utils</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-mock-record-utils</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureAuthenticationStrategy.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureAuthenticationStrategy.java
new file mode 100644
index 0000000000..65d40538ae
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureAuthenticationStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum AzureAuthenticationStrategy implements DescribedValue {
+ SHARED_ACCESS_KEY("Shared Access Key", "Azure Event Hub shared access key"),
+ DEFAULT_AZURE_CREDENTIAL("Default Azure Credential", "The Default Azure Credential " +
+ "will read credentials from standard environment variables and will also attempt to read " +
+ "Managed Identity credentials when running in Microsoft Azure environments");
+
+ private final String displayName;
+ private final String description;
+
+ AzureAuthenticationStrategy(String displayName, String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public String getValue() {
+ return name();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java
new file mode 100644
index 0000000000..53f09d5061
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/java/org/apache/nifi/services/azure/eventhub/AzureEventHubRecordSink.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.identity.DefaultAzureCredential;
+import com.azure.identity.DefaultAzureCredentialBuilder;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubClientBuilder;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"azure", "record", "sink"})
+@CapabilityDescription("Format and send Records to Azure Event Hubs")
+public class AzureEventHubRecordSink extends AbstractControllerService implements RecordSinkService {
+
+ static final AllowableValue AZURE_ENDPOINT = new AllowableValue(".servicebus.windows.net","Azure", "Default Service Bus Endpoint");
+
+ static final AllowableValue AZURE_CHINA_ENDPOINT = new AllowableValue(".servicebus.chinacloudapi.cn", "Azure China", "China Service Bus Endpoint");
+
+ static final AllowableValue AZURE_GERMANY_ENDPOINT = new AllowableValue(".servicebus.cloudapi.de", "Azure Germany", "Germany Service Bus Endpoint");
+
+ static final AllowableValue AZURE_US_GOV_ENDPOINT = new AllowableValue(".servicebus.usgovcloudapi.net", "Azure US Government", "United States Government Endpoint");
+
+ static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
+ .name("Service Bus Endpoint")
+ .description("Provides the domain for connecting to Azure Event Hubs")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues(
+ AZURE_ENDPOINT,
+ AZURE_CHINA_ENDPOINT,
+ AZURE_GERMANY_ENDPOINT,
+ AZURE_US_GOV_ENDPOINT
+ )
+ .defaultValue(AZURE_ENDPOINT.getValue())
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor EVENT_HUB_NAMESPACE = new PropertyDescriptor.Builder()
+ .name("Event Hub Namespace")
+ .description("Provides provides the host for connecting to Azure Event Hubs")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
+ .name("Event Hub Name")
+ .description("Provides the Event Hub Name for connections")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor AUTHENTICATION_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Authentication Strategy")
+ .description("Strategy for authenticating to Azure Event Hubs")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .allowableValues(AzureAuthenticationStrategy.class)
+ .required(true)
+ .defaultValue(AzureAuthenticationStrategy.DEFAULT_AZURE_CREDENTIAL.getValue())
+ .build();
+
+ static final PropertyDescriptor SHARED_ACCESS_POLICY = new PropertyDescriptor.Builder()
+ .name("Shared Access Policy")
+ .description("The name of the shared access policy. This policy must have Send claims")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .required(false)
+ .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
+ .build();
+
+ static final PropertyDescriptor SHARED_ACCESS_POLICY_KEY = new PropertyDescriptor.Builder()
+ .name("Shared Access Policy Key")
+ .description("The primary or secondary key of the shared access policy")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .sensitive(true)
+ .required(false)
+ .dependsOn(AUTHENTICATION_STRATEGY, AzureAuthenticationStrategy.SHARED_ACCESS_KEY.getValue())
+ .build();
+
+ static final PropertyDescriptor PARTITION_KEY = new PropertyDescriptor.Builder()
+ .name("Partition Key")
+ .description("A hint for Azure Event Hub message broker how to distribute messages across one or more partitions")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .required(false)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(
+ Arrays.asList(
+ SERVICE_BUS_ENDPOINT,
+ EVENT_HUB_NAMESPACE,
+ EVENT_HUB_NAME,
+ RECORD_WRITER_FACTORY,
+ AUTHENTICATION_STRATEGY,
+ SHARED_ACCESS_POLICY,
+ SHARED_ACCESS_POLICY_KEY,
+ PARTITION_KEY
+ )
+ );
+
+ private volatile ConfigurationContext context;
+ private RecordSetWriterFactory writerFactory;
+ private EventHubProducerClient client;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ protected EventHubProducerClient createEventHubClient(final String namespace,
+ final String serviceBusEndpoint,
+ final String eventHubName,
+ final String policyName,
+ final String policyKey,
+ final AzureAuthenticationStrategy authenticationStrategy
+ ) {
+ final String fullyQualifiedNamespace = String.format("%s%s", namespace, serviceBusEndpoint);
+ final EventHubClientBuilder eventHubClientBuilder = new EventHubClientBuilder();
+ if (AzureAuthenticationStrategy.SHARED_ACCESS_KEY == authenticationStrategy) {
+ final AzureNamedKeyCredential azureNamedKeyCredential = new AzureNamedKeyCredential(policyName, policyKey);
+ eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, azureNamedKeyCredential);
+ } else {
+ final DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder();
+ final DefaultAzureCredential defaultAzureCredential = defaultAzureCredentialBuilder.build();
+ eventHubClientBuilder.credential(fullyQualifiedNamespace, eventHubName, defaultAzureCredential);
+ }
+ return eventHubClientBuilder.buildProducerClient();
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ this.context = context;
+ writerFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+ final String namespace = context.getProperty(EVENT_HUB_NAMESPACE).evaluateAttributeExpressions().getValue();
+ final String serviceBusEndpoint = context.getProperty(SERVICE_BUS_ENDPOINT).evaluateAttributeExpressions().getValue();
+ final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
+ final String policyName = context.getProperty(SHARED_ACCESS_POLICY).getValue();
+ final String policyKey = context.getProperty(SHARED_ACCESS_POLICY_KEY).getValue();
+ final String authenticationStrategy = context.getProperty(AUTHENTICATION_STRATEGY).getValue();
+ final AzureAuthenticationStrategy azureAuthenticationStrategy = AzureAuthenticationStrategy.valueOf(authenticationStrategy);
+ client = createEventHubClient(namespace, serviceBusEndpoint, eventHubName, policyName, policyKey, azureAuthenticationStrategy);
+ }
+
+ @OnDisabled
+ public void onDisabled() {
+ if (client == null) {
+ getLogger().debug("Event Hub Client not configured");
+ } else {
+ client.close();
+ }
+ }
+
+ @Override
+ public WriteResult sendData(final RecordSet recordSet, Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
+ final Map<String, String> writeAttributes = new LinkedHashMap<>(attributes);
+ final String partitionKey = context.getProperty(PARTITION_KEY).evaluateAttributeExpressions(attributes).getValue();
+ final CreateBatchOptions createBatchOptions = new CreateBatchOptions();
+ createBatchOptions.setPartitionKey(partitionKey);
+ EventDataBatch eventDataBatch = client.createBatch(createBatchOptions);
+ final String correlationId = writeAttributes.get(CoreAttributes.UUID.key());
+ int recordCount = 0;
+ try (
+ final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ final RecordSetWriter writer = writerFactory.createWriter(getLogger(), recordSet.getSchema(), outputStream, attributes)
+ ) {
+ Record record;
+ final String contentType = writer.getMimeType();
+ while ((record = recordSet.next()) != null) {
+ writer.write(record);
+ writer.flush();
+ recordCount++;
+
+ final byte[] bytes = outputStream.toByteArray();
+ outputStream.reset();
+ final EventData eventData = new EventData(bytes);
+ eventData.getProperties().putAll(writeAttributes);
+ eventData.setContentType(contentType);
+ eventData.setCorrelationId(correlationId);
+ eventData.setMessageId(String.format("%s-%d", correlationId, recordCount));
+ if (!eventDataBatch.tryAdd(eventData)) {
+ if (eventDataBatch.getCount() > 0) {
+ client.send(eventDataBatch);
+ eventDataBatch = client.createBatch(createBatchOptions);
+ }
+ if (!eventDataBatch.tryAdd(eventData)) {
+ throw new ProcessException("Record " + recordCount + " exceeds maximum event data size: " + eventDataBatch.getMaxSizeInBytes());
+ }
+ }
+ }
+ if (eventDataBatch.getCount() > 0) {
+ client.send(eventDataBatch);
+ }
+ } catch (final SchemaNotFoundException e) {
+ throw new IOException("Record Schema not found", e);
+ }
+ return WriteResult.of(recordCount, writeAttributes);
+ }
+}
+
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000000..2684da536e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.services.azure.eventhub.AzureEventHubRecordSink
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java
new file mode 100644
index 0000000000..8ab5595a6b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-record-sink/src/test/java/org/apache/nifi/services/azure/eventhub/TestAzureEventHubRecordSink.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.eventhub;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventDataBatch;
+import com.azure.messaging.eventhubs.EventHubProducerClient;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.NoOpProcessor;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+
+@ExtendWith(MockitoExtension.class)
+public class TestAzureEventHubRecordSink {
+ private static final String EVENT_HUB_NAMESPACE = "namespace";
+ private static final String EVENT_HUB_NAME = "hub";
+ private static final String POLICY_KEY = "policyKey";
+ private static final String NULL_HEADER = null;
+ private static final String WRITER_IDENTIFIER = MockRecordWriter.class.getSimpleName();
+ private static final String IDENTIFIER = AzureEventHubRecordSink.class.getSimpleName();
+ private static final String ID_FIELD = "id";
+ private static final String ID_FIELD_VALUE = TestAzureEventHubRecordSink.class.getSimpleName();
+ private static final RecordSchema RECORD_SCHEMA = getRecordSchema();
+ private static final boolean SEND_ZERO_RESULTS = true;
+
+ @Mock
+ private EventHubProducerClient client;
+
+ @Mock
+ private EventDataBatch eventDataBatch;
+
+ private AzureEventHubRecordSink azureEventHubRecordSink;
+
+ @BeforeEach
+ void setRunner() throws Exception {
+ final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
+ runner.setValidateExpressionUsage(false);
+
+ final MockRecordWriter recordWriter = new MockRecordWriter(NULL_HEADER, false);
+ runner.addControllerService(WRITER_IDENTIFIER, recordWriter);
+ runner.enableControllerService(recordWriter);
+
+ azureEventHubRecordSink = new MockAzureEventHubRecordSink();
+ runner.addControllerService(IDENTIFIER, azureEventHubRecordSink);
+ runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAME, EVENT_HUB_NAME);
+ runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.EVENT_HUB_NAMESPACE, EVENT_HUB_NAMESPACE);
+ runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.SHARED_ACCESS_POLICY_KEY, POLICY_KEY);
+ runner.setProperty(azureEventHubRecordSink, AzureEventHubRecordSink.RECORD_WRITER_FACTORY, WRITER_IDENTIFIER);
+ runner.enableControllerService(azureEventHubRecordSink);
+ }
+
+ @Test
+ void testSendDataNoRecords() throws IOException {
+ when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
+
+ final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA);
+ final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+ assertNotNull(writeResult);
+ assertEquals(0, writeResult.getRecordCount());
+
+ verify(client, times(0)).send(any(EventDataBatch.class));
+ }
+
+ @Test
+ void testSendDataOneRecordException() {
+ when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
+ when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(false);
+
+ final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(1));
+ assertThrows(ProcessException.class, ()-> azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS));
+
+ verify(client, never()).send(any(EventDataBatch.class));
+ }
+
+ @Test
+ void testSendDataOneRecord() throws IOException {
+ when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
+ when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(true);
+
+ final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(1));
+ final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+ assertNotNull(writeResult);
+ assertEquals(1, writeResult.getRecordCount());
+ }
+
+ @Test
+ void testSendDataTwoRecords() throws IOException {
+ when(client.createBatch(any(CreateBatchOptions.class))).thenReturn(eventDataBatch);
+ when(eventDataBatch.tryAdd(isA(EventData.class))).thenReturn(true);
+
+ final RecordSet recordSet = RecordSet.of(RECORD_SCHEMA, getRecords(2));
+ final WriteResult writeResult = azureEventHubRecordSink.sendData(recordSet, Collections.emptyMap(), SEND_ZERO_RESULTS);
+
+ assertNotNull(writeResult);
+ assertEquals(2, writeResult.getRecordCount());
+ }
+
+ public class MockAzureEventHubRecordSink extends AzureEventHubRecordSink {
+ @Override
+ protected EventHubProducerClient createEventHubClient(
+ final String namespace,
+ final String serviceBusEndpoint,
+ final String eventHubName,
+ final String policyName,
+ final String policyKey,
+ final AzureAuthenticationStrategy authenticationStrategy) throws ProcessException {
+ return client;
+ }
+ }
+
+ private static RecordSchema getRecordSchema() {
+ final RecordField idField = new RecordField(ID_FIELD, RecordFieldType.STRING.getDataType());
+ return new SimpleRecordSchema(Collections.singletonList(idField));
+ }
+
+ private static Record[] getRecords(int numberOfRecords) {
+ final Map<String, Object> values = Collections.singletonMap(ID_FIELD, ID_FIELD_VALUE);
+ final Record record = new MapRecord(RECORD_SCHEMA, values);
+ final Record[] records = new Record[numberOfRecords];
+ Arrays.fill(records, record);
+ return records;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
index a00ad4db2e..f617b0a3cd 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/pom.xml
@@ -41,6 +41,8 @@
<module>nifi-azure-nar</module>
<module>nifi-azure-services-api</module>
<module>nifi-azure-services-api-nar</module>
+ <module>nifi-azure-record-sink</module>
+ <module>nifi-azure-record-sink-nar</module>
</modules>
<dependencyManagement>