You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/08/09 10:59:26 UTC

[nifi] branch main updated: NIFI-8668 ConsumeAzureEventHub NiFi processors need to support storage SAS token authentication

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4623063  NIFI-8668 ConsumeAzureEventHub NiFi processors need to support storage SAS token authentication
4623063 is described below

commit 462306369f6cb041cdc4aa63d8579030aafa65ce
Author: Timea Barna <ti...@gmail.com>
AuthorDate: Tue Jun 8 07:56:38 2021 +0200

    NIFI-8668 ConsumeAzureEventHub NiFi processors need to support storage SAS token authentication
    
    This closes #5136.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../nifi/processor/util/StandardValidators.java    | 35 ++++++++-
 .../util/validator/TestStandardValidators.java     | 90 ++++++++++++++++++++++
 .../azure/eventhub/ConsumeAzureEventHub.java       | 64 ++++++++++++---
 .../azure/eventhub/TestConsumeAzureEventHub.java   | 73 ++++++++++++++++--
 4 files changed, 243 insertions(+), 19 deletions(-)

diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
index fd53401..1dad5d4 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java
@@ -645,28 +645,55 @@ public class StandardValidators {
                 return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
             }
         };
-
     }
 
+
     public static Validator createRegexMatchingValidator(final Pattern pattern) {
+        return createRegexMatchingValidator(pattern, false, "Value does not match regular expression: " + pattern.pattern());
+    }
+
+    public static Validator createRegexMatchingValidator(final Pattern pattern, final boolean evaluateExpressions, final String validationMessage) {
         return new Validator() {
             @Override
             public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+                String value = input;
                 if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
-                    return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
+                    if (evaluateExpressions) {
+                        try {
+                            value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+                        } catch (final Exception e) {
+                            return new ValidationResult.Builder()
+                                    .subject(subject)
+                                    .input(input)
+                                    .valid(false)
+                                    .explanation("Failed to evaluate the Attribute Expression Language due to " + e.toString())
+                                    .build();
+                        }
+                    } else {
+                        return new ValidationResult.Builder()
+                                .subject(subject)
+                                .input(input)
+                                .explanation("Expression Language Present")
+                                .valid(true)
+                                .build();
+                    }
                 }
 
-                final boolean matches = pattern.matcher(input).matches();
+                final boolean matches = value != null && pattern.matcher(value).matches();
                 return new ValidationResult.Builder()
                         .input(input)
                         .subject(subject)
                         .valid(matches)
-                        .explanation(matches ? null : "Value does not match regular expression: " + pattern.pattern())
+                        .explanation(matches ? null : validationMessage)
                         .build();
             }
         };
     }
 
+    public static Validator createRegexMatchingValidator(final Pattern pattern, final boolean evaluateExpressions) {
+        return createRegexMatchingValidator(pattern, evaluateExpressions, "Value does not match regular expression: " + pattern.pattern());
+    }
+
     /**
      * Creates a @{link Validator} that ensure that a value is a valid Java
      * Regular Expression with at least <code>minCapturingGroups</code>
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java
index c82c64e..69bbe9b 100644
--- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java
@@ -16,13 +16,19 @@
  */
 package org.apache.nifi.util.validator;
 
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
@@ -523,4 +529,88 @@ public class TestStandardValidators {
         vr = val.validate("foo", "http://localhost , https://host2:8080 ", vc);
         assertTrue(vr.isValid());
     }
+
+    @Test
+    public void testRegexMatchingValidatorWithoutEL() {
+        Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"));
+        ValidationContext vc = mock(ValidationContext.class);
+        when(vc.isExpressionLanguagePresent(any())).thenReturn(false);
+        when(vc.isExpressionLanguageSupported(any())).thenReturn(false);
+
+        validatePropertyIsInvalid(val, null, vc);
+
+        validatePropertyIsInvalid(val, "", vc);
+
+        validatePropertyIsInvalid(val, "invalid string", vc);
+
+        validatePropertyIsValid(val, "?valid string", vc);
+    }
+
+    @Test
+    public void testRegexMatchingValidatorWithEL() {
+        Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"), true);
+        ValidationContext vc = mock(ValidationContext.class);
+        when(vc.isExpressionLanguagePresent(any())).thenReturn(true);
+        when(vc.isExpressionLanguageSupported(any())).thenReturn(true);
+
+        validatePropertyWithELIsInvalid(val, null, vc);
+
+        validatePropertyWithELIsInvalid(val, "", vc);
+
+        validatePropertyWithELIsInvalid(val, "invalid string", vc);
+
+        validatePropertyWithELIsValid(val, "?valid string", vc);
+    }
+
+    @Test
+    public void testRegexMatchingValidatorWithELError() {
+        Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"), true);
+        ValidationContext vc = mock(ValidationContext.class);
+        when(vc.isExpressionLanguagePresent(any())).thenReturn(true);
+        when(vc.isExpressionLanguageSupported(any())).thenReturn(true);
+
+        ValidationResult vr = val.validate("foo", "invalid", vc);
+        assertFalse(vr.isValid());
+        assertThat(vr.getExplanation(), containsString("Failed to evaluate the Attribute Expression Language"));
+    }
+
+    @Test
+    public void testRegexMatchingValidatorWithELWithoutEvaluation() {
+        Validator val = StandardValidators.createRegexMatchingValidator(Pattern.compile("^\\?.*$"), false);
+        ValidationContext vc = mock(ValidationContext.class);
+        when(vc.isExpressionLanguagePresent(any())).thenReturn(true);
+        when(vc.isExpressionLanguageSupported(any())).thenReturn(true);
+
+        ValidationResult vr = val.validate("foo", "valid", vc);
+        assertTrue(vr.isValid());
+        assertEquals("Expression Language Present", vr.getExplanation());
+    }
+
+    private void validatePropertyIsValid(final Validator val, final String input, final ValidationContext vc) {
+        ValidationResult vr = val.validate("foo", input, vc);
+        assertTrue(vr.isValid());
+    }
+
+    private void validatePropertyIsInvalid(final Validator val, final String input, final ValidationContext vc) {
+        ValidationResult vr = val.validate("foo", input, vc);
+        assertFalse(vr.isValid());
+    }
+
+    private void validatePropertyWithELIsValid(Validator val, String input, ValidationContext vc) {
+        PropertyValue property = mock(PropertyValue.class);
+        when(vc.newPropertyValue(input)).thenReturn(property);
+        when(property.evaluateAttributeExpressions()).thenReturn(property);
+        when(property.evaluateAttributeExpressions().getValue()).thenReturn(input);
+        ValidationResult vr = val.validate("foo", input, vc);
+        assertTrue(vr.isValid());
+    }
+
+    private void validatePropertyWithELIsInvalid(Validator val, String input, ValidationContext vc) {
+        PropertyValue property = mock(PropertyValue.class);
+        when(vc.newPropertyValue(input)).thenReturn(property);
+        when(property.evaluateAttributeExpressions()).thenReturn(property);
+        when(property.evaluateAttributeExpressions().getValue()).thenReturn(input);
+        ValidationResult vr = val.validate("foo", input, vc);
+        assertFalse(vr.isValid());
+    }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
index da3f3cd..2b0f2e3 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -74,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import static org.apache.nifi.util.StringUtils.isEmpty;
 
@@ -91,7 +92,9 @@ import static org.apache.nifi.util.StringUtils.isEmpty;
 })
 public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
 
-    private static final String FORMAT_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+    private static final Pattern SAS_TOKEN_PATTERN = Pattern.compile("^\\?.*$");
+    private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+    private static final String FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN = "BlobEndpoint=https://%s.blob.core.windows.net/;SharedAccessSignature=%s";
 
     static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
             .name("event-hub-namespace")
@@ -228,7 +231,17 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
             .sensitive(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .required(true)
+            .required(false)
+            .build();
+    static final PropertyDescriptor STORAGE_SAS_TOKEN = new PropertyDescriptor.Builder()
+            .name("storage-sas-token")
+            .displayName("Storage SAS Token")
+            .description("The Azure Storage SAS token to store Event Hub consumer group state. Always starts with a ? character.")
+            .sensitive(true)
+            .addValidator(StandardValidators.createRegexMatchingValidator(SAS_TOKEN_PATTERN, true,
+                    "Token must start with a ? character."))
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(false)
             .build();
     static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder()
             .name("storage-container-name")
@@ -261,7 +274,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
                 NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, USE_MANAGED_IDENTITY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
                 RECORD_READER, RECORD_WRITER,
                 INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT,
-                STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME
+                STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_SAS_TOKEN, STORAGE_CONTAINER_NAME
         ));
 
         Set<Relationship> relationships = new HashSet<>();
@@ -324,6 +337,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
         final List<ValidationResult> results = new ArrayList<>();
         final ControllerService recordReader = validationContext.getProperty(RECORD_READER).asControllerService();
         final ControllerService recordWriter = validationContext.getProperty(RECORD_WRITER).asControllerService();
+        final String storageAccountKey = validationContext.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+        final String storageSasToken = validationContext.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
+
         if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) {
             results.add(new ValidationResult.Builder()
                     .subject("Record Reader and Writer")
@@ -332,6 +348,26 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
                     .valid(false)
                     .build());
         }
+
+        if (StringUtils.isBlank(storageAccountKey) && StringUtils.isBlank(storageSasToken)) {
+            results.add(new ValidationResult.Builder()
+                    .subject(String.format("%s or %s",
+                            STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
+                    .explanation(String.format("either %s or %s should be set.",
+                            STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
+                    .valid(false)
+                    .build());
+        }
+
+        if (StringUtils.isNotBlank(storageAccountKey) && StringUtils.isNotBlank(storageSasToken)) {
+            results.add(new ValidationResult.Builder()
+                    .subject(String.format("%s or %s",
+                            STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
+                    .explanation(String.format("%s and %s should not be set at the same time.",
+                            STORAGE_ACCOUNT_KEY.getDisplayName(), STORAGE_SAS_TOKEN.getDisplayName()))
+                    .valid(false)
+                    .build());
+        }
         results.addAll(AzureEventHubUtils.customValidate(ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, validationContext));
         return results;
     }
@@ -579,12 +615,6 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
         final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
         validateRequiredProperty(EVENT_HUB_NAME, eventHubName);
 
-        final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
-        validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
-
-        final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
-        validateRequiredProperty(STORAGE_ACCOUNT_KEY, storageAccountKey);
-
 
         final String consumerHostname = orDefault(context.getProperty(CONSUMER_HOSTNAME).evaluateAttributeExpressions().getValue(),
                 EventProcessorHost.createHostName("nifi"));
@@ -617,7 +647,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
                 .evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
         options.setReceiveTimeOut(Duration.ofMillis(receiveTimeoutMillis));
 
-        final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey);
+        final String storageConnectionString = createStorageConnectionString(context);
 
         final String connectionString;
         final boolean useManagedIdentity = context.getProperty(USE_MANAGED_IDENTITY).asBoolean();
@@ -630,6 +660,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
             validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
             connectionString = AzureEventHubUtils.getSharedAccessSignatureConnectionString(namespaceName, eventHubName, sasName, sasKey);
         }
+
         eventProcessorHost = EventProcessorHost.EventProcessorHostBuilder
                                 .newBuilder(consumerHostname, consumerGroupName)
                                 .useAzureStorageCheckpointLeaseManager(storageConnectionString, containerName, null)
@@ -645,6 +676,19 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
         eventProcessorHost.registerEventProcessorFactory(new EventProcessorFactory(), options).get();
     }
 
+    private String createStorageConnectionString(final ProcessContext context) {
+        final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+        validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
+
+        final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+        final String storageSasToken = context.getProperty(STORAGE_SAS_TOKEN).evaluateAttributeExpressions().getValue();
+
+        if (storageAccountKey != null) {
+            return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_ACCOUNT_KEY, storageAccountName, storageAccountKey);
+        }
+        return String.format(FORMAT_STORAGE_CONNECTION_STRING_FOR_SAS_TOKEN, storageAccountName, storageSasToken);
+    }
+
     private String orDefault(String value, String defaultValue) {
         return isEmpty(value) ? defaultValue : value;
     }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
index 17f8442..aef8b4a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -73,8 +73,11 @@ import org.apache.nifi.serialization.record.MockRecordWriter;
 public class TestConsumeAzureEventHub {
     private static final String namespaceName = "nifi-azure-hub";
     private static final String eventHubName = "get-test";
+    private static final String policyName = "test-pn";
+    private static final String policyKey = "test-pk";
     private static final String storageAccountName = "test-sa";
     private static final String storageAccountKey = "test-sa-key";
+    private static final String storageSasToken = "?test-sa-token";
 
     private ConsumeAzureEventHub.EventProcessor eventProcessor;
     private MockProcessSession processSession;
@@ -133,11 +136,71 @@ public class TestConsumeAzureEventHub {
     }
 
     @Test
+    public void testProcessorConfigValidityWithNeitherStorageKeyNorTokenSet() {
+        TestRunner testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName);
+        testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
+        testRunner.assertNotValid();
+    }
+
+    @Test
+    public void testProcessorConfigValidityWithBothStorageKeyAndTokenSet() {
+        TestRunner testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName);
+        testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey);
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN, storageSasToken);
+        testRunner.assertNotValid();
+    }
+
+    @Test
+    public void testProcessorConfigValidityWithTokenSet() {
+        TestRunner testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName);
+        testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_SAS_TOKEN, storageSasToken);
+        testRunner.assertValid();
+    }
+
+    @Test
+    public void testProcessorConfigValidityWithStorageKeySet() {
+        TestRunner testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(ConsumeAzureEventHub.EVENT_HUB_NAME,eventHubName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.NAMESPACE,namespaceName);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.ACCESS_POLICY_NAME, policyName);
+        testRunner.setProperty(ConsumeAzureEventHub.POLICY_PRIMARY_KEY, policyKey);
+        testRunner.assertNotValid();
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_NAME, storageAccountName);
+        testRunner.setProperty(ConsumeAzureEventHub.STORAGE_ACCOUNT_KEY, storageAccountKey);
+        testRunner.assertValid();
+    }
+
+    @Test
     public void testReceivedApplicationProperties() throws Exception {
         final EventData singleEvent = EventData.create("one".getBytes(StandardCharsets.UTF_8));
         singleEvent.getProperties().put("event-sender", "Apache NiFi");
         singleEvent.getProperties().put("application", "TestApp");
-        final Iterable<EventData> eventDataList = Arrays.asList(singleEvent);
+        final Iterable<EventData> eventDataList = Collections.singletonList(singleEvent);
         eventProcessor.onEvents(partitionContext, eventDataList);
 
         processSession.assertCommitted();
@@ -150,7 +213,7 @@ public class TestConsumeAzureEventHub {
 
     @Test
     public void testReceiveOne() throws Exception {
-        final Iterable<EventData> eventDataList = Arrays.asList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));
+        final Iterable<EventData> eventDataList = Collections.singletonList(EventData.create("one".getBytes(StandardCharsets.UTF_8)));
         eventProcessor.onEvents(partitionContext, eventDataList);
 
         processSession.assertCommitted();
@@ -262,7 +325,7 @@ public class TestConsumeAzureEventHub {
                 .collect(Collectors.toList());
 
         final List<Record> recordSetList = addEndRecord.apply(recordList);
-        final Record[] records = recordSetList.toArray(new Record[recordSetList.size()]);
+        final Record[] records = recordSetList.toArray(new Record[0]);
 
         switch (throwExceptionAt) {
             case -1:
@@ -277,8 +340,8 @@ public class TestConsumeAzureEventHub {
             default:
                 final List<Record> recordList1 = addEndRecord.apply(recordList.subList(0, throwExceptionAt));
                 final List<Record> recordList2 = addEndRecord.apply(recordList.subList(throwExceptionAt + 1, recordList.size()));
-                final Record[] records1 = recordList1.toArray(new Record[recordList1.size()]);
-                final Record[] records2 = recordList2.toArray(new Record[recordList2.size()]);
+                final Record[] records1 = recordList1.toArray(new Record[0]);
+                final Record[] records2 = recordList2.toArray(new Record[0]);
                 when(reader.nextRecord())
                         .thenReturn(records1[0], Arrays.copyOfRange(records1, 1, records1.length))
                         .thenThrow(new MalformedRecordException("Simulating Record parse failure."))