You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/01/14 04:30:03 UTC

[nifi] branch master updated: NIFI-6998 This closes #3972. Batch & Partitioning key + Asynchronous sending in limited batches + Property to determine attribute name carrying partitioning key + Maximum batch size property + Carrier object - Unit test fakery NIFI-6998 Attributes to User Defined properties NIFI-6998 Unit tests NIFI-6998 Review corrections + Interruption propagation (& test) + Final carrier members + Unnecessary generic modifiers removed from generic container NIFI-6998 checkstyle corrections + Tabs to spa [...]

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2601f72  NIFI-6998 This closes #3972. Batch & Partitioning key + Asynchronous sending in limited batches + Property to determine attribute name carrying partitioning key + Maximum batch size property + Carrier object - Unit test fakery NIFI-6998 Attributes to User Defined properties NIFI-6998 Unit tests NIFI-6998 Review corrections + Interruption propagation (& test) + Final carrier members + Unnecessary generic modifiers removed from generic container NIFI-6998 checkstyle correc [...]
2601f72 is described below

commit 2601f722b3230a640e86c83320a75b86288c0e60
Author: Martin Šimek <ma...@zf.com>
AuthorDate: Thu Jan 9 17:32:50 2020 +0100

    NIFI-6998 This closes #3972. Batch & Partitioning key
    + Asynchronous sending in limited batches
    + Property to determine attribute name carrying partitioning key
    + Maximum batch size property
    + Carrier object
    - Unit test fakery
    NIFI-6998 Attributes to User Defined properties
    NIFI-6998 Unit tests
    NIFI-6998 Review corrections
    + Interruption propagation (& test)
    + Final carrier members
    + Unnecessary generic modifiers removed from generic container
    NIFI-6998 checkstyle corrections
    + Tabs to spaces, trailnig spaces
    + Absolute Imports
    + Braces locations
    NIFI-6998 imports & license
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../azure/eventhub/PutAzureEventHub.java           | 229 +++++++++++++++---
 .../azure/storage/utils/FlowFileResultCarrier.java |  51 ++++
 .../azure/eventhub/PutAzureEventHubTest.java       | 260 +++++++++++++++++++--
 3 files changed, 494 insertions(+), 46 deletions(-)

diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
index fb584e8..2c9a8ae 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHub.java
@@ -19,10 +19,16 @@ package org.apache.nifi.processors.azure.eventhub;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -35,6 +41,8 @@ import com.microsoft.azure.eventhubs.EventHubException;
 import com.microsoft.azure.eventhubs.IllegalConnectionStringFormatException;
 import com.microsoft.azure.eventhubs.impl.EventHubClientImpl;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -53,6 +61,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
 
@@ -92,6 +101,24 @@ public class PutAzureEventHub extends AbstractProcessor {
             .sensitive(true)
             .required(true)
             .build();
+    static final PropertyDescriptor PARTITIONING_KEY_ATTRIBUTE_NAME = new PropertyDescriptor.Builder()
+            .name("partitioning-key-attribute-name")
+            .displayName("Partitioning Key Attribute Name")
+            .description("If specified, the value from argument named by this field will be used as a partitioning key to be used by event hub.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR)
+            .defaultValue(null)
+            .build();
+    static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("max-batch-size")
+            .displayName("Maximum batch size")
+            .description("Maximum count of flow files being processed in one batch.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .addValidator(StandardValidators.NUMBER_VALIDATOR)
+            .defaultValue("100")
+            .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
@@ -117,6 +144,8 @@ public class PutAzureEventHub extends AbstractProcessor {
         _propertyDescriptors.add(NAMESPACE);
         _propertyDescriptors.add(ACCESS_POLICY);
         _propertyDescriptors.add(POLICY_PRIMARY_KEY);
+        _propertyDescriptors.add(PARTITIONING_KEY_ATTRIBUTE_NAME);
+        _propertyDescriptors.add(MAX_BATCH_SIZE);
         propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
 
         Set<Relationship> _relationships = new HashSet<>();
@@ -150,6 +179,132 @@ public class PutAzureEventHub extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        populateSenderQueue(context);
+
+        final StopWatch stopWatch = new StopWatch(true);
+
+        final String partitioningKeyAttributeName = context.getProperty(PARTITIONING_KEY_ATTRIBUTE_NAME).getValue();
+
+        // Get N flow files
+        final int maxBatchSize = NumberUtils.toInt(context.getProperty(MAX_BATCH_SIZE).getValue(), 100);
+        final List<FlowFile> flowFileList = session.get(maxBatchSize);
+
+        // Convert and send each flow file
+        final BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> futureQueue = new LinkedBlockingQueue<>();
+        for (FlowFile flowFile : flowFileList) {
+            if (flowFile == null) {
+                continue;
+            }
+
+            futureQueue.offer(handleFlowFile(flowFile, partitioningKeyAttributeName, session));
+        }
+
+        waitForAllFutures(context, session, stopWatch, futureQueue);
+    }
+
+    /**
+     * Joins all the futures so it can determine which flow files from given batch were sent successfully and which were not.
+     *
+     * @param context of this instance of the processor
+     * @param session that handles all flow files sent within the future queue
+     * @param stopWatch for time measurements
+     * @param futureQueue a list of futures of messages that had been sent within above context and session before this method was called.
+     */
+    protected void waitForAllFutures(
+            final ProcessContext context,
+            final ProcessSession session,
+            final StopWatch stopWatch,
+            final BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> futureQueue){
+
+        try {
+            for (CompletableFuture<FlowFileResultCarrier<Relationship>> completableFuture : futureQueue) {
+                completableFuture.join();
+
+                final FlowFileResultCarrier<Relationship> flowFileResult = completableFuture.get();
+                if(flowFileResult == null) {
+                    continue;
+                }
+
+                final FlowFile flowFile = flowFileResult.getFlowFile();
+
+                if(flowFileResult.getResult() == REL_SUCCESS) {
+                    final String namespace = context.getProperty(NAMESPACE).getValue();
+                    final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
+                    session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+                    session.transfer(flowFile, REL_SUCCESS);
+
+                } else {
+                    final Throwable processException = flowFileResult.getException();
+                    getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, processException}, processException);
+                    session.transfer(session.penalize(flowFile), REL_FAILURE);
+                }
+            }
+        } catch (InterruptedException | ExecutionException | CancellationException | CompletionException e) {
+            getLogger().error("Batch processing failed", e);
+            session.rollback();
+
+            if(e instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+
+            throw new ProcessException("Batch processing failed", e);
+        }
+    }
+
+    /**
+     * Convert flow file to eventhub message entities (and send)!!!
+     *
+     * @param flowFile to be converted to a message and sent to Eventhub (Body = content, User Properties = attributes, partitioning key = value configured attribute)
+     * @param partitioningKeyAttributeName where the partitioning is saved within each flow file
+     * @param session under which is this flow file being managed
+     *
+     * @return Completable future carrying the context of flowfile used as a base for message being send. Never Null.
+     * */
+    protected CompletableFuture<FlowFileResultCarrier<Relationship>> handleFlowFile(FlowFile flowFile, final String partitioningKeyAttributeName, final ProcessSession session) {
+
+        // Read message body
+        final byte[] buffer = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
+
+        // Lift partitioning key
+        final String partitioningKey;
+        if (StringUtils.isNotBlank(partitioningKeyAttributeName)) {
+            partitioningKey = flowFile.getAttribute(partitioningKeyAttributeName);
+        } else {
+            partitioningKey = null;
+        }
+
+        // Prepare user properties
+        final Map<String, Object> userProperties;
+        Map<String, String> attributes = flowFile.getAttributes();
+        if(attributes == null) {
+            userProperties = Collections.emptyMap();
+        }else {
+            userProperties = new HashMap<>(attributes);
+        }
+
+        // Send the message
+        try {
+            return sendMessage(buffer, partitioningKey, userProperties)
+                    .thenApplyAsync(param -> {
+                        return new FlowFileResultCarrier<Relationship>(flowFile, REL_SUCCESS);
+                    })
+                    .exceptionally(processException -> {
+                        return new FlowFileResultCarrier<Relationship>(flowFile, REL_FAILURE, processException);
+                    });
+
+        } catch (final ProcessException processException) {
+            return CompletableFuture.completedFuture(new FlowFileResultCarrier<Relationship>(flowFile, REL_FAILURE, processException));
+        }
+    }
+
+
+    /**
+     * Prepare at least one Event hub sender based on this instance of processor.
+     *
+     * @param context of this processor instance from which all connectivity information properties are taken.
+     */
+    protected void populateSenderQueue(ProcessContext context) {
         if(senderQueue.size() == 0){
             final int numThreads = context.getMaxConcurrentTasks();
             senderQueue = new LinkedBlockingQueue<>(numThreads);
@@ -165,31 +320,17 @@ public class PutAzureEventHub extends AbstractProcessor {
                 }
             }
         }
-
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final StopWatch stopWatch = new StopWatch(true);
-            final byte[] buffer = new byte[(int) flowFile.getSize()];
-            session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer));
-
-            try {
-                sendMessage(buffer);
-            } catch (final ProcessException processException) {
-                getLogger().error("Failed to send {} to EventHub due to {}; routing to failure", new Object[]{flowFile, processException}, processException);
-                session.transfer(session.penalize(flowFile), REL_FAILURE);
-                return;
-            }
-
-            final String namespace = context.getProperty(NAMESPACE).getValue();
-            final String eventHubName = context.getProperty(EVENT_HUB_NAME).getValue();
-            session.getProvenanceReporter().send(flowFile, "amqps://" + namespace + ".servicebus.windows.net" + "/" + eventHubName, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
-            session.transfer(flowFile, REL_SUCCESS);
-
     }
 
+    /**
+     * @param namespace name of the Eventhub namespace (part of the domain name)
+     * @param eventHubName name of the eventhub, a message broker entity. Like topic.
+     * @param policyName technically it is username bound to eventhub namespace or hub and privileges.
+     * @param policyKey password belonging to the above policy
+     * @param executor thread executor to perform the client connection.
+     * @return An initialized eventhub client based on supplied parameters.
+     * @throws ProcessException when creation of event hub fails due to formatting of conection string. Authorization or even network connectivity.
+     */
     protected EventHubClient createEventHubClient(
         final String namespace,
         final String eventHubName,
@@ -206,22 +347,44 @@ public class PutAzureEventHub extends AbstractProcessor {
             throw new ProcessException(e);
         }
     }
+
     protected String getConnectionString(final String namespace, final String eventHubName, final String policyName, final String policyKey){
         return new ConnectionStringBuilder().setNamespaceName(namespace).setEventHubName(eventHubName).setSasKeyName(policyName).setSasKey(policyKey).toString();
     }
-    protected void sendMessage(final byte[] buffer) throws ProcessException {
+
+    /**
+     * @param buffer Block of data to be sent as a message body. Entire array is used. See Event hub limits for body size.
+     * @param partitioningKey A hint for Eventhub message broker how to distribute messages consistently amongst multiple partitions.
+     * @param userProperties A key value set of customary information that is attached in User defined properties part of the message.
+     * @return future object for referencing a success/failure of this message sending.
+     * @throws ProcessException
+     *
+     * @see <a href="https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-quotas">Event Hubs Quotas</a>
+     */
+    protected CompletableFuture<Void> sendMessage(final byte[] buffer, String partitioningKey, Map<String, Object> userProperties) throws ProcessException {
 
         final EventHubClient sender = senderQueue.poll();
-        if(null != sender) {
-            try {
-                sender.sendSync(EventData.create(buffer));
-            } catch (final EventHubException sbe) {
-                throw new ProcessException("Caught exception trying to send message to eventbus", sbe);
-            } finally {
-                senderQueue.offer(sender);
-            }
-        }else{
+        if(sender == null) {
             throw new ProcessException("No EventHubClients are configured for sending");
         }
+
+        // Create message with properties
+        final EventData eventData = EventData.create(buffer);
+        final Map<String, Object> properties = eventData.getProperties();
+        if(userProperties != null && properties != null) {
+            properties.putAll(userProperties);
+        }
+
+        // Send with optional partition key
+        final CompletableFuture<Void> eventFuture;
+        if(StringUtils.isNotBlank(partitioningKey)) {
+            eventFuture = sender.send(eventData, partitioningKey);
+        }else {
+            eventFuture = sender.send(eventData);
+        }
+
+        senderQueue.offer(sender);
+
+        return eventFuture;
     }
 }
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/FlowFileResultCarrier.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/FlowFileResultCarrier.java
new file mode 100644
index 0000000..7c49b99
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/FlowFileResultCarrier.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public class FlowFileResultCarrier<T> {
+
+    final private FlowFile flowFile;
+    final private T result;
+    final private Throwable exception;
+
+    public FlowFileResultCarrier(FlowFile flowFile, T result) {
+        this.flowFile = flowFile;
+        this.result = result;
+        this.exception = null;
+    }
+
+    public FlowFileResultCarrier(FlowFile flowFile, T result, Throwable exception) {
+        this.flowFile = flowFile;
+        this.result = result;
+        this.exception = exception;
+    }
+
+    public FlowFile getFlowFile() {
+        return flowFile;
+    }
+
+    public T getResult() {
+        return result;
+    }
+
+    public Throwable getException() {
+        return exception;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
index ab556c5..83ed649 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/PutAzureEventHubTest.java
@@ -16,20 +16,57 @@
  */
 package org.apache.nifi.processors.azure.eventhub;
 
-import com.microsoft.azure.eventhubs.EventHubClient;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.azure.storage.utils.FlowFileResultCarrier;
+import org.apache.nifi.util.StopWatch;
 import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 
-import java.util.concurrent.ScheduledExecutorService;
+import com.google.common.collect.ImmutableMap;
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventhubs.EventHubClient;
 
 public class PutAzureEventHubTest {
     private static final String namespaceName = "nifi-azure-hub";
     private static final String eventHubName = "get-test";
     private static final String sasKeyName = "bogus-policy";
     private static final String sasKey = "9rHmHqxoOVWOb8wS09dvqXYxnNiLqxNMCbmt6qMaQyU!";
+    private static final String TEST_PARTITIONING_KEY_ATTRIBUTE_NAME = "x-opt-partition-key";
+    private static final String TEST_PARTITIONING_KEY = "some-partitioning-key";
 
 
     private TestRunner testRunner;
@@ -92,35 +129,213 @@ public class PutAzureEventHubTest {
         testRunner.run(1, true);
     }
 
+    @Test
+    public void testMessageIsSentWithPartitioningKeyIfSpecifiedAndPopulated() {
+        MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub();
+        MockitoAnnotations.initMocks(processor);
+
+        EventHubClient eventHubClient = processor.getEventHubClient();
+        when(eventHubClient.send(any(EventData.class), anyString()))
+        .thenReturn(CompletableFuture.completedFuture(null));
+
+        when(eventHubClient.send(any(EventData.class)))
+        .thenThrow(new RuntimeException("Partition-key-less method called despite key is defined and required."));
+
+        testRunner = TestRunners.newTestRunner(processor);
+        setUpStandardTestConfig();
+        testRunner.setProperty(PutAzureEventHub.PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY_ATTRIBUTE_NAME);
+
+        MockFlowFile flowFile = new MockFlowFile(1234);
+        flowFile.putAttributes(ImmutableMap.of(TEST_PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY));
+        testRunner.enqueue(flowFile);
+        testRunner.run(1, true);
+
+        Mockito.verify(eventHubClient).send(any(EventData.class), eq(TEST_PARTITIONING_KEY));
+    }
+
+    @Test
+    public void testMessageIsSentWithoutPartitioningKeyIfNotSpecifiedOrNotPopulated() {
+        MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub();
+        MockitoAnnotations.initMocks(processor);
+
+        EventHubClient eventHubClient = processor.getEventHubClient();
+        when(eventHubClient.send(any(EventData.class), anyString()))
+        .thenThrow(new RuntimeException("Partition-key-full method called despite key is Not required or not populated."));
+
+        when(eventHubClient.send(any(EventData.class)))
+        .thenReturn(CompletableFuture.completedFuture(null));
+
+        testRunner = TestRunners.newTestRunner(processor);
+        setUpStandardTestConfig();
+
+        MockFlowFile flowFile = new MockFlowFile(1234);
+        flowFile.putAttributes(ImmutableMap.of(TEST_PARTITIONING_KEY_ATTRIBUTE_NAME, TEST_PARTITIONING_KEY));
+
+        // Key not specified
+        testRunner.enqueue(flowFile);
+        testRunner.run(1, true);
+
+        Mockito.verify(eventHubClient, never()).send(any(EventData.class), eq(TEST_PARTITIONING_KEY));
+        Mockito.verify(eventHubClient).send(any(EventData.class));
+
+        // Key wanted but not available
+        testRunner.setProperty(PutAzureEventHub.PARTITIONING_KEY_ATTRIBUTE_NAME, "Non-existing-attribute");
+
+        testRunner.enqueue(flowFile);
+        testRunner.run(1, true);
+
+        Mockito.verify(eventHubClient, never()).send(any(EventData.class), eq(TEST_PARTITIONING_KEY));
+        Mockito.verify(eventHubClient, times(2)).send(any(EventData.class));
+    }
+
+    @Test
+    public void testAllAttributesAreLiftedToProperties() {
+        MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub();
+        MockitoAnnotations.initMocks(processor);
+
+        EventHubClient eventHubClient = processor.getEventHubClient();
+        when(eventHubClient.send(any(EventData.class)))
+        .thenReturn(CompletableFuture.completedFuture(null));
+
+        testRunner = TestRunners.newTestRunner(processor);
+        setUpStandardTestConfig();
+
+        MockFlowFile flowFile = new MockFlowFile(1234);
+        ImmutableMap<String, String> demoAttributes = ImmutableMap.of("A", "a", "B", "b", "D", "d", "C", "c");
+        flowFile.putAttributes(demoAttributes);
+
+        testRunner.enqueue(flowFile);
+        testRunner.run(1, true);
+        ArgumentCaptor<EventData> eventDataCaptor = ArgumentCaptor.forClass(EventData.class);
+
+        Mockito.verify(eventHubClient).send(eventDataCaptor.capture());
+
+        EventData event = eventDataCaptor.getValue();
+        assertTrue(event.getProperties().entrySet().containsAll(demoAttributes.entrySet()));
+    }
+
+    @Test
+    public void testBatchProcessesUptoMaximum() {
+        MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub();
+        MockitoAnnotations.initMocks(processor);
+
+        EventHubClient eventHubClient = processor.getEventHubClient();
+
+        CompletableFuture<Void> failedFuture = new CompletableFuture<Void>();
+        failedFuture.completeExceptionally(new IllegalArgumentException());
+
+        when(eventHubClient.send(any(EventData.class)))
+        .thenReturn(failedFuture)
+        .thenReturn(CompletableFuture.completedFuture(null));
+
+        testRunner = TestRunners.newTestRunner(processor);
+        setUpStandardTestConfig();
+
+        List<MockFlowFile> flowFiles = Arrays.asList(new MockFlowFile(1), new MockFlowFile(2), new MockFlowFile(3),
+                new MockFlowFile(4), new MockFlowFile(5), new MockFlowFile(6));
+
+        flowFiles.stream().forEachOrdered(ff -> testRunner.enqueue(ff));
+
+        testRunner.setProperty(PutAzureEventHub.MAX_BATCH_SIZE, "4");
+        testRunner.run(1, true);
+
+        Mockito.verify(eventHubClient, times(4)).send(any(EventData.class));
+        testRunner.assertTransferCount(PutAzureEventHub.REL_SUCCESS, 3);
+        testRunner.assertTransferCount(PutAzureEventHub.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testFailedBatchProcessesRollsBackTransactions() throws InterruptedException, ExecutionException {
+        MockedEventhubClientMockPutAzureEventHub processor = new PutAzureEventHubTest.MockedEventhubClientMockPutAzureEventHub();
+        MockitoAnnotations.initMocks(processor);
+
+        final BlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>> futureQueue = new LinkedBlockingQueue<CompletableFuture<FlowFileResultCarrier<Relationship>>>();
+
+        @SuppressWarnings("unchecked")
+        CompletableFuture<FlowFileResultCarrier<Relationship>> throwingFuture = (CompletableFuture<FlowFileResultCarrier<Relationship>>)mock(CompletableFuture.class);
+
+        when(throwingFuture.get()).thenThrow(new ExecutionException(new IllegalArgumentException()));
+
+        MockFlowFile flowFile1 = new MockFlowFile(1);
+        MockFlowFile flowFile2 = new MockFlowFile(2);
+
+        futureQueue.offer(CompletableFuture.completedFuture(null));
+        futureQueue.offer(CompletableFuture.completedFuture(new FlowFileResultCarrier<Relationship>(flowFile1, PutAzureEventHub.REL_SUCCESS)));
+        futureQueue.offer(CompletableFuture.completedFuture(new FlowFileResultCarrier<Relationship>(flowFile2, PutAzureEventHub.REL_FAILURE, new IllegalArgumentException())));
+        futureQueue.offer(throwingFuture);
+
+        testRunner = TestRunners.newTestRunner(processor);
+        setUpStandardTestConfig();
+        testRunner.enqueue(flowFile1);
+        testRunner.enqueue(flowFile2);
+
+
+        final ProcessContext context = testRunner.getProcessContext();
+        final ProcessSession session = spy(testRunner.getProcessSessionFactory().createSession());
+        doNothing().when(session).transfer(any(FlowFile.class), any());
+        doReturn(flowFile2).when(session).penalize(any());
+
+        try {
+            processor.waitForAllFutures(context, session,  new StopWatch(true), futureQueue);
+            assertFalse(true);
+        }catch(ProcessException pe) {
+            assertTrue(true);
+            assertFalse(Thread.currentThread().isInterrupted());
+        }
+
+        verify(session).transfer(flowFile1, PutAzureEventHub.REL_SUCCESS);
+        verify(session).transfer(flowFile2, PutAzureEventHub.REL_FAILURE);
+        verify(session).rollback();
+
+        //Second run to test interrupted exception
+        Mockito.reset(throwingFuture, session);
+        when(throwingFuture.get()).thenThrow(new InterruptedException());
+        doNothing().when(session).transfer(any(FlowFile.class), any());
+        doReturn(flowFile2).when(session).penalize(any());
+
+        try {
+            processor.waitForAllFutures(context, session,  new StopWatch(true), futureQueue);
+            assertFalse(true);
+        }catch(ProcessException pe) {
+            assertTrue(true);
+            assertTrue(Thread.currentThread().isInterrupted());
+        }
+
+    }
+
     private static class MockPutAzureEventHub extends PutAzureEventHub{
         byte[] receivedBuffer = null;
+
         byte[] getReceivedBuffer(){
             return receivedBuffer;
         }
 
+
         @Override
         protected EventHubClient createEventHubClient(
-            final String namespace,
-            final String eventHubName,
-            final String policyName,
-            final String policyKey,
-            final ScheduledExecutorService executor) throws ProcessException {
+                final String namespace,
+                final String eventHubName,
+                final String policyName,
+                final String policyKey,
+                final ScheduledExecutorService executor) throws ProcessException {
             return null;
         }
 
         @Override
-        protected void sendMessage(final byte[] buffer) throws ProcessException {
+        protected CompletableFuture<Void> sendMessage(final byte[] buffer, String partitioningKey, Map<String, Object> userProperties) throws ProcessException {
             receivedBuffer = buffer;
+
+            return CompletableFuture.completedFuture(null);
         }
     }
     private static class OnSendThrowingMockPutAzureEventHub extends PutAzureEventHub{
         @Override
         protected EventHubClient createEventHubClient(
-            final String namespace,
-            final String eventHubName,
-            final String policyName,
-            final String policyKey,
-            final ScheduledExecutorService executor) throws ProcessException {
+                final String namespace,
+                final String eventHubName,
+                final String policyName,
+                final String policyKey,
+                final ScheduledExecutorService executor) throws ProcessException {
             return null;
         }
     }
@@ -130,6 +345,25 @@ public class PutAzureEventHubTest {
             return "Bogus Connection String";
         }
     }
+    private static class MockedEventhubClientMockPutAzureEventHub extends PutAzureEventHub{
+
+        @Mock
+        private EventHubClient client;
+
+        public EventHubClient getEventHubClient() {
+            return client;
+        }
+
+        @Override
+        protected EventHubClient createEventHubClient(
+                final String namespace,
+                final String eventHubName,
+                final String policyName,
+                final String policyKey,
+                final ScheduledExecutorService executor) throws ProcessException {
+            return client;
+        }
+    }
     private void setUpStandardTestConfig() {
         testRunner.setProperty(PutAzureEventHub.EVENT_HUB_NAME,eventHubName);
         testRunner.setProperty(PutAzureEventHub.NAMESPACE,namespaceName);