You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2018/01/04 19:14:56 UTC

nifi git commit: NIFI-4595: This closes #2264. Add ConsumeAzureEventHub.

Repository: nifi
Updated Branches:
  refs/heads/master 7a8dbb8b1 -> 98af3dc4c


NIFI-4595: This closes #2264. Add ConsumeAzureEventHub.

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/98af3dc4
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/98af3dc4
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/98af3dc4

Branch: refs/heads/master
Commit: 98af3dc4cd56da324bb4572aab2dc395338ed45d
Parents: 7a8dbb8
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Nov 8 23:20:58 2017 +0900
Committer: joewitt <jo...@apache.org>
Committed: Thu Jan 4 14:14:17 2018 -0500

----------------------------------------------------------------------
 .../src/main/resources/META-INF/LICENSE         |   6 +-
 .../src/main/resources/META-INF/NOTICE          |  21 +
 .../nifi-azure-processors/pom.xml               |  18 +-
 .../azure/eventhub/ConsumeAzureEventHub.java    | 654 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../eventhub/TestConsumeAzureEventHub.java      | 397 +++++++++++
 6 files changed, 1091 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/98af3dc4/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/LICENSE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/LICENSE b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/LICENSE
index c527bf7..a821684 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/LICENSE
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/LICENSE
@@ -206,11 +206,7 @@
   The binary distribution of this product bundles source from 'Microsoft Azure IoT Device Libraries'.
   The source is available under an MIT LICENSE.
 
-	Copyright (c) Microsoft Corporation
-	
-	All rights reserved. 
-	
-	MIT License
+	Copyright (c) 2016 Microsoft Azure
 	
 	Permission is hereby granted, free of charge, to any person obtaining a copy of this software and 
 	associated documentation files (the Software), to deal in the Software without restriction, including 

http://git-wip-us.apache.org/repos/asf/nifi/blob/98af3dc4/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
index 09107c1..25452fb 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/src/main/resources/META-INF/NOTICE
@@ -23,6 +23,27 @@ The following binary components are provided under the Apache Software License v
       Apache Commons IO
       Copyright 2002-2016 The Apache Software Foundation
 
+  (ASLv2) Google GSON
+    The following NOTICE information applies:
+      Copyright 2008 Google Inc.
+
+  (ASLv2) Apache Commons Codec
+    The following NOTICE information applies:
+      Apache Commons Codec
+      Copyright 2002-2014 The Apache Software Foundation
+
+      src/test/org/apache/commons/codec/language/DoubleMetaphoneTest.java
+      contains test data from http://aspell.net/test/orig/batch0.tab.
+      Copyright (C) 2002 Kevin Atkinson (kevina@gnu.org)
+
+      ===============================================================================
+
+      The content of package org.apache.commons.codec.language.bm has been translated
+      from the original php source code available at http://stevemorse.org/phoneticinfo.htm
+      with permission from the original authors.
+      Original source copyright:
+      Copyright (c) 2008 Alexander Beider & Stephen P. Morse.
+
   (ASLv2) Jackson JSON processor
     The following NOTICE information applies:
       # Jackson JSON processor

http://git-wip-us.apache.org/repos/asf/nifi/blob/98af3dc4/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index e43bfe0..49d0afd 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -19,6 +19,9 @@
     </parent>
     <artifactId>nifi-azure-processors</artifactId>
     <packaging>jar</packaging>
+    <properties>
+        <azure-eventhubs.version>0.14.4</azure-eventhubs.version>
+    </properties>
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
@@ -29,9 +32,22 @@
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-eventhubs</artifactId>
-            <version>0.14.2</version>
+            <version>${azure-eventhubs.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-eventhubs-eph</artifactId>
+            <version>${azure-eventhubs.version}</version>
         </dependency>
         <dependency>
             <groupId>com.microsoft.azure</groupId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/98af3dc4/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
new file mode 100644
index 0000000..dc18097
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub;
+
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventprocessorhost.CloseReason;
+import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
+import com.microsoft.azure.eventprocessorhost.EventProcessorOptions;
+import com.microsoft.azure.eventprocessorhost.IEventProcessor;
+import com.microsoft.azure.eventprocessorhost.IEventProcessorFactory;
+import com.microsoft.azure.eventprocessorhost.PartitionContext;
+import com.microsoft.azure.servicebus.ConnectionStringBuilder;
+import com.microsoft.azure.servicebus.ReceiverDisconnectedException;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.StopWatch;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+
+import static org.apache.nifi.util.StringUtils.isEmpty;
+
+@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
+@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@TriggerSerially
+@WritesAttributes({
+        @WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"),
+        @WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
+        @WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"),
+        @WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"),
+        @WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled")
+})
+public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
+
+    static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
+            .name("event-hub-namespace")
+            .displayName("Event Hub Namespace")
+            .description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+    static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
+            .name("event-hub-name")
+            .displayName("Event Hub Name")
+            .description("The name of the Azure Event Hub to pull messages from.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+    // TODO: Do we need to support custom service endpoints as GetAzureEventHub does? Is it possible?
+    static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder()
+            .name("event-hub-shared-access-policy-name")
+            .displayName("Shared Access Policy Name")
+            .description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+    static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
+            .name("event-hub-shared-access-policy-primary-key")
+            .displayName("Shared Access Policy Primary Key")
+            .description("The primary key of the Event Hub Shared Access Policy.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .sensitive(true)
+            .required(true)
+            .build();
+    static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder()
+            .name("event-hub-consumer-group")
+            .displayName("Event Hub Consumer Group")
+            .description("The name of the Event Hub Consumer Group to use.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("$Default")
+            .required(true)
+            .build();
+    static final PropertyDescriptor CONSUMER_HOSTNAME = new PropertyDescriptor.Builder()
+            .name("event-hub-consumer-hostname")
+            .displayName("Event Hub Consumer Hostname")
+            .description("The hostname of this Event Hub Consumer instance." +
+                    " If not specified, an unique identifier is generated in 'nifi-<UUID>' format.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("The Record Reader to use for reading received messages." +
+                    " The Event Hub name can be referred by Expression Language '${eventhub.name}' to access a schema.")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .expressionLanguageSupported(false)
+            .required(false)
+            .build();
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("The Record Writer to use for serializing Records to an output FlowFile." +
+                    " The Event Hub name can be referred by Expression Language '${eventhub.name}' to access a schema." +
+                    " If not specified, each message will create a FlowFile.")
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .expressionLanguageSupported(false)
+            .required(false)
+            .build();
+
+    static final AllowableValue INITIAL_OFFSET_START_OF_STREAM = new AllowableValue(
+            "start-of-stream", "Start of stream", "Read from the oldest message retained in the stream.");
+    static final AllowableValue INITIAL_OFFSET_END_OF_STREAM = new AllowableValue(
+            "end-of-stream", "End of stream",
+            "Ignore old retained messages even if exist, start reading new ones from now.");
+    static final PropertyDescriptor INITIAL_OFFSET = new PropertyDescriptor.Builder()
+            .name("event-hub-initial-offset")
+            .displayName("Initial Offset")
+            .description("Specify where to start receiving messages if offset is not stored in Azure Storage.")
+            .required(true)
+            .allowableValues(INITIAL_OFFSET_START_OF_STREAM, INITIAL_OFFSET_END_OF_STREAM)
+            .defaultValue(INITIAL_OFFSET_END_OF_STREAM.getValue())
+            .build();
+    static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder()
+            .name("event-hub-prefetch-count")
+            .displayName("Prefetch Count")
+            .defaultValue("The number of messages to fetch from Event Hub before processing." +
+                    " This parameter affects throughput." +
+                    " The more prefetch count, the better throughput in general, but consumes more resources (RAM)." +
+                    " NOTE: Even though Event Hub client API provides this option," +
+                    " actual number of messages can be pre-fetched is depend on the Event Hub server implementation." +
+                    " It is reported that only one event is received at a time in certain situation." +
+                    " https://github.com/Azure/azure-event-hubs-java/issues/125")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("300")
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("event-hub-batch-size")
+            .displayName("Batch Size")
+            .description("The number of messages to process within a NiFi session." +
+                    " This parameter affects throughput and consistency." +
+                    " NiFi commits its session and Event Hub checkpoint after processing this number of messages." +
+                    " If NiFi session is committed, but failed to create an Event Hub checkpoint," +
+                    " then it is possible that the same messages to be retrieved again." +
+                    " The higher number, the higher throughput, but possibly less consistent.")
+            .defaultValue("10")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+    static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("event-hub-message-receive-timeout")
+            .displayName("Message Receive Timeout")
+            .description("The amount of time this consumer should wait to receive the Prefetch Count before returning.")
+            .defaultValue("1 min")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+    static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder()
+            .name("storage-account-name")
+            .displayName("Storage Account Name")
+            .description("Name of the Azure Storage account to store Event Hub Consumer Group state.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+    static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
+            .name("storage-account-key")
+            .displayName("Storage Account Key")
+            .description("The Azure Storage account key to store Event Hub Consumer Group state.")
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+    static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder()
+            .name("storage-container-name")
+            .displayName("Storage Container Name")
+            .description("Name of the Azure Storage Container to store Event Hub Consumer Group state." +
+                    " If not specified, Event Hub name is used.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
+
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles received from Event Hub.")
+            .build();
+
+    static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
+            .name("parse.failure")
+            .description("If a message from Event Hub cannot be parsed using the configured Record Reader" +
+                    " or failed to be written by the configured Record Writer," +
+                    " the contents of the message will be routed to this Relationship as its own individual FlowFile.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS;
+    private static final Set<Relationship> RECORD_RELATIONSHIPS;
+    private static final List<PropertyDescriptor> PROPERTIES;
+
+    static {
+        PROPERTIES = Collections.unmodifiableList(Arrays.asList(
+                NAMESPACE, EVENT_HUB_NAME, ACCESS_POLICY_NAME, POLICY_PRIMARY_KEY, CONSUMER_GROUP, CONSUMER_HOSTNAME,
+                RECORD_READER, RECORD_WRITER,
+                INITIAL_OFFSET, PREFETCH_COUNT, BATCH_SIZE, RECEIVE_TIMEOUT,
+                STORAGE_ACCOUNT_NAME, STORAGE_ACCOUNT_KEY, STORAGE_CONTAINER_NAME
+        ));
+
+        Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+
+        relationships.add(REL_PARSE_FAILURE);
+        RECORD_RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+    }
+
+    private volatile EventProcessorHost eventProcessorHost;
+    private volatile ProcessSessionFactory processSessionFactory;
+    private volatile RecordReaderFactory readerFactory;
+    private volatile RecordSetWriterFactory writerFactory;
+    // The namespace name can not be retrieved from a PartitionContext at EventProcessor.onEvents, so keep it here.
+    private volatile String namespaceName;
+    private volatile boolean isRecordReaderSet = false;
+    private volatile boolean isRecordWriterSet = false;
+
+    /**
+     * For unit test.
+     */
+    void setProcessSessionFactory(ProcessSessionFactory processSessionFactory) {
+        this.processSessionFactory = processSessionFactory;
+    }
+
+    /**
+     * For unit test.
+     */
+    void setNamespaceName(String namespaceName) {
+        this.namespaceName = namespaceName;
+    }
+
+    /**
+     * For unit test.
+     */
+    public void setReaderFactory(RecordReaderFactory readerFactory) {
+        this.readerFactory = readerFactory;
+    }
+
+    /**
+     * For unit test.
+     */
+    public void setWriterFactory(RecordSetWriterFactory writerFactory) {
+        this.writerFactory = writerFactory;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return isRecordReaderSet && isRecordWriterSet ? RECORD_RELATIONSHIPS : RELATIONSHIPS;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+        final ControllerService recordReader = validationContext.getProperty(RECORD_READER).asControllerService();
+        final ControllerService recordWriter = validationContext.getProperty(RECORD_WRITER).asControllerService();
+        if ((recordReader != null && recordWriter == null) || (recordReader == null && recordWriter != null)) {
+            results.add(new ValidationResult.Builder()
+                    .subject("Record Reader and Writer")
+                    .explanation(String.format("Both %s and %s should be set in order to write FlowFiles as Records.",
+                            RECORD_READER.getDisplayName(), RECORD_WRITER.getDisplayName()))
+                    .valid(false)
+                    .build());
+        }
+        return results;
+    }
+
+    @Override
+    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
+        if (RECORD_READER.equals(descriptor)) {
+            isRecordReaderSet = !StringUtils.isEmpty(newValue);
+        } else if (RECORD_WRITER.equals(descriptor)) {
+            isRecordWriterSet = !StringUtils.isEmpty(newValue);
+        }
+    }
+
+    public class EventProcessorFactory implements IEventProcessorFactory {
+        @Override
+        public IEventProcessor createEventProcessor(PartitionContext context) throws Exception {
+            final EventProcessor eventProcessor = new EventProcessor();
+            return eventProcessor;
+        }
+    }
+
+    public class EventProcessor implements IEventProcessor {
+
+        @Override
+        public void onOpen(PartitionContext context) throws Exception {
+            getLogger().info("Consumer group {} opened partition {} of {}",
+                    new Object[]{context.getConsumerGroupName(), context.getPartitionId(), context.getEventHubPath()});
+        }
+
+        @Override
+        public void onClose(PartitionContext context, CloseReason reason) throws Exception {
+            getLogger().info("Consumer group {} closed partition {} of {}. reason={}",
+                    new Object[]{context.getConsumerGroupName(), context.getPartitionId(), context.getEventHubPath(), reason});
+        }
+
+        @Override
+        public void onEvents(PartitionContext context, Iterable<EventData> messages) throws Exception {
+            final ProcessSession session = processSessionFactory.createSession();
+
+            try {
+
+                final StopWatch stopWatch = new StopWatch(true);
+
+                if (readerFactory != null && writerFactory != null) {
+                    writeRecords(context, messages, session, stopWatch);
+                } else {
+                    writeFlowFiles(context, messages, session, stopWatch);
+                }
+
+                // Commit NiFi first.
+                session.commit();
+                // If creating an Event Hub checkpoint failed, then the same message can be retrieved again.
+                context.checkpoint();
+
+            } catch (Exception e) {
+                getLogger().error("Unable to fully process received message due to " + e, e);
+                // FlowFiles those are already committed will not get rollback.
+                session.rollback();
+            }
+
+        }
+
+        private void putEventHubAttributes(Map<String, String> attributes, String eventHubName, String partitionId, EventData eventData) {
+            final EventData.SystemProperties systemProperties = eventData.getSystemProperties();
+            if (null != systemProperties) {
+                attributes.put("eventhub.enqueued.timestamp", String.valueOf(systemProperties.getEnqueuedTime()));
+                attributes.put("eventhub.offset", systemProperties.getOffset());
+                attributes.put("eventhub.sequence", String.valueOf(systemProperties.getSequenceNumber()));
+            }
+
+            attributes.put("eventhub.name", eventHubName);
+            attributes.put("eventhub.partition", partitionId);
+        }
+
+        private void writeFlowFiles(PartitionContext context, Iterable<EventData> messages, ProcessSession session, StopWatch stopWatch) {
+            final String eventHubName = context.getEventHubPath();
+            final String partitionId = context.getPartitionId();
+            final String consumerGroup = context.getConsumerGroupName();
+            messages.forEach(eventData -> {
+                FlowFile flowFile = session.create();
+
+                final Map<String, String> attributes = new HashMap<>();
+                putEventHubAttributes(attributes, eventHubName, partitionId, eventData);
+
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                flowFile = session.write(flowFile, out -> {
+                    out.write(eventData.getBytes());
+                });
+
+                transferTo(REL_SUCCESS, session, stopWatch, eventHubName, partitionId, consumerGroup, flowFile);
+            });
+        }
+
+        private void transferTo(Relationship relationship, ProcessSession session, StopWatch stopWatch,
+                                String eventHubName, String partitionId, String consumerGroup, FlowFile flowFile) {
+            session.transfer(flowFile, relationship);
+            final String transitUri = "amqps://" + namespaceName + ".servicebus.windows.net/" + eventHubName + "/ConsumerGroups/" + consumerGroup + "/Partitions/" + partitionId;
+            session.getProvenanceReporter().receive(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+        }
+
+        private void writeRecords(PartitionContext context, Iterable<EventData> messages, ProcessSession session, StopWatch stopWatch)
+                throws SchemaNotFoundException, IOException {
+
+            final String eventHubName = context.getEventHubPath();
+            final String partitionId = context.getPartitionId();
+            final String consumerGroup = context.getConsumerGroupName();
+            final Map<String, String> schemaRetrievalVariables = new HashMap<>();
+            schemaRetrievalVariables.put("eventhub.name", eventHubName);
+
+            final ComponentLog logger = getLogger();
+            FlowFile flowFile = session.create();
+            final Map<String, String> attributes = new HashMap<>();
+
+            RecordSetWriter writer = null;
+            EventData lastEventData = null;
+            WriteResult lastWriteResult = null;
+            int recordCount = 0;
+
+            try (final OutputStream out = session.write(flowFile)) {
+                for (final EventData eventData : messages) {
+
+                    try (final InputStream in = new ByteArrayInputStream(eventData.getBytes())) {
+                        final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, logger);
+
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+
+                            if (writer == null) {
+                                // Initialize the writer when the first record is read.
+                                final RecordSchema readerSchema = record.getSchema();
+                                final RecordSchema writeSchema = writerFactory.getSchema(schemaRetrievalVariables, readerSchema);
+                                writer = writerFactory.createWriter(logger, writeSchema, out);
+                                writer.beginRecordSet();
+                            }
+
+                            lastWriteResult = writer.write(record);
+                            recordCount += lastWriteResult.getRecordCount();
+                        }
+
+                        lastEventData = eventData;
+
+                    } catch (Exception e) {
+                        // Write it to the parse failure relationship.
+                        logger.error("Failed to parse message from Azure Event Hub using configured Record Reader and Writer due to " + e, e);
+                        FlowFile failed = session.create();
+                        session.write(failed, o -> o.write(eventData.getBytes()));
+                        putEventHubAttributes(attributes, eventHubName, partitionId, eventData);
+                        failed = session.putAllAttributes(failed, attributes);
+                        transferTo(REL_PARSE_FAILURE, session, stopWatch, eventHubName, partitionId, consumerGroup, failed);
+                    }
+                }
+
+                if (lastEventData != null) {
+                    putEventHubAttributes(attributes, eventHubName, partitionId, lastEventData);
+
+                    attributes.put("record.count", String.valueOf(recordCount));
+                    if (writer != null) {
+                        writer.finishRecordSet();
+                        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                        if (lastWriteResult != null) {
+                            attributes.putAll(lastWriteResult.getAttributes());
+                        }
+
+                        try {
+                            writer.close();
+                        } catch (IOException e) {
+                            logger.warn("Failed to close Record Writer due to {}" + e, e);
+                        }
+                    }
+                }
+            }
+
+            // This part has to be outside of 'session.write(flowFile)' code block.
+            if (lastEventData != null) {
+                flowFile = session.putAllAttributes(flowFile, attributes);
+                transferTo(REL_SUCCESS, session, stopWatch, eventHubName, partitionId, consumerGroup, flowFile);
+            } else {
+                // If there's no successful event data, then remove the FlowFile.
+                session.remove(flowFile);
+            }
+        }
+
+        @Override
+        public void onError(PartitionContext context, Throwable e) {
+            if (e instanceof ReceiverDisconnectedException && e.getMessage().startsWith("New receiver with higher epoch of ")) {
+                // This is a known behavior in a NiFi cluster where multiple nodes consumes from the same Event Hub.
+                // Once another node connects, some partitions are given to that node to distribute consumer load.
+                // When that happens, this exception is thrown.
+                getLogger().info("New receiver took over partition {} of Azure Event Hub {}, consumerGroupName={}, message={}",
+                        new Object[]{context.getPartitionId(), context.getEventHubPath(), context.getConsumerGroupName(), e.getMessage()});
+                return;
+            }
+            getLogger().error("An error occurred while receiving messages from Azure Event Hub {} at partition {}," +
+                            " consumerGroupName={}, exception={}",
+                    new Object[]{context.getEventHubPath(), context.getPartitionId(), context.getConsumerGroupName(), e}, e);
+        }
+
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
+
+        if (eventProcessorHost == null) {
+            try {
+                registerEventProcessor(context);
+            } catch (IllegalArgumentException e) {
+                // In order to show simple error message without wrapping it by another ProcessException, just throw it as it is.
+                throw e;
+            } catch (Exception e) {
+                throw new ProcessException("Failed to register the event processor due to " + e, e);
+            }
+            processSessionFactory = sessionFactory;
+
+            readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+            writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        }
+
+        // After a EventProcessor is registered successfully, nothing has to be done at onTrigger
+        // because new sessions are created when new messages are arrived by the EventProcessor.
+        context.yield();
+    }
+
+    @OnStopped
+    public void unregisterEventProcessor(final ProcessContext context) {
+        if (eventProcessorHost != null) {
+            try {
+                eventProcessorHost.unregisterEventProcessor();
+                eventProcessorHost = null;
+                processSessionFactory = null;
+                readerFactory = null;
+                writerFactory = null;
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to unregister the event processor due to " + e, e);
+            }
+        }
+    }
+
+    private void registerEventProcessor(final ProcessContext context) throws Exception {
+        // Validate required properties.
+        final String consumerGroupName = context.getProperty(CONSUMER_GROUP).evaluateAttributeExpressions().getValue();
+        validateRequiredProperty(CONSUMER_GROUP, consumerGroupName);
+
+        namespaceName = context.getProperty(NAMESPACE).evaluateAttributeExpressions().getValue();
+        validateRequiredProperty(NAMESPACE, namespaceName);
+
+        final String eventHubName = context.getProperty(EVENT_HUB_NAME).evaluateAttributeExpressions().getValue();
+        validateRequiredProperty(EVENT_HUB_NAME, eventHubName);
+
+        final String sasName = context.getProperty(ACCESS_POLICY_NAME).evaluateAttributeExpressions().getValue();
+        validateRequiredProperty(ACCESS_POLICY_NAME, sasName);
+
+        final String sasKey = context.getProperty(POLICY_PRIMARY_KEY).evaluateAttributeExpressions().getValue();
+        validateRequiredProperty(POLICY_PRIMARY_KEY, sasKey);
+
+        final String storageAccountName = context.getProperty(STORAGE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+        validateRequiredProperty(STORAGE_ACCOUNT_NAME, storageAccountName);
+
+        final String storageAccountKey = context.getProperty(STORAGE_ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+        validateRequiredProperty(STORAGE_ACCOUNT_KEY, storageAccountKey);
+
+
+        final String consumerHostname = orDefault(context.getProperty(CONSUMER_HOSTNAME).evaluateAttributeExpressions().getValue(),
+                EventProcessorHost.createHostName("nifi"));
+
+        final String containerName = orDefault(context.getProperty(STORAGE_CONTAINER_NAME).evaluateAttributeExpressions().getValue(),
+                eventHubName);
+
+
+        final EventProcessorOptions options = new EventProcessorOptions();
+        final String initialOffset = context.getProperty(INITIAL_OFFSET).getValue();
+        if (INITIAL_OFFSET_START_OF_STREAM.getValue().equals(initialOffset)) {
+            options.setInitialOffsetProvider(options.new StartOfStreamInitialOffsetProvider());
+        } else if (INITIAL_OFFSET_END_OF_STREAM.getValue().equals(initialOffset)){
+            options.setInitialOffsetProvider(options.new EndOfStreamInitialOffsetProvider());
+        } else {
+            throw new IllegalArgumentException("Initial offset " + initialOffset + " is not allowed.");
+        }
+
+        final Integer prefetchCount = context.getProperty(PREFETCH_COUNT).evaluateAttributeExpressions().asInteger();
+        if (prefetchCount != null && prefetchCount > 0) {
+            options.setPrefetchCount(prefetchCount);
+        }
+
+        final Integer batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        if (batchSize != null && batchSize > 0) {
+            options.setMaxBatchSize(batchSize);
+        }
+
+        final Long receiveTimeoutMillis = context.getProperty(RECEIVE_TIMEOUT)
+                .evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        options.setReceiveTimeOut(Duration.ofMillis(receiveTimeoutMillis));
+
+        final String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, storageAccountName, storageAccountKey);
+
+        final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder(namespaceName, eventHubName, sasName, sasKey);
+
+        eventProcessorHost = new EventProcessorHost(consumerHostname, eventHubName, consumerGroupName, eventHubConnectionString.toString(), storageConnectionString, containerName);
+
+        options.setExceptionNotification(e -> {
+            getLogger().error("An error occurred while receiving messages from Azure Event Hub {}" +
+                            " at consumer group {} and partition {}, action={}, hostname={}, exception={}",
+                    new Object[]{eventHubName, consumerGroupName, e.getPartitionId(), e.getAction(), e.getHostname()}, e.getException());
+        });
+
+
+        eventProcessorHost.registerEventProcessorFactory(new EventProcessorFactory(), options).get();
+    }
+
+    private String orDefault(String value, String defaultValue) {
+        return isEmpty(value) ? defaultValue : value;
+    }
+
+    private void validateRequiredProperty(PropertyDescriptor property, String value) {
+        if (isEmpty(value)) {
+            throw new IllegalArgumentException(String.format("'%s' is required, but not specified.", property.getDisplayName()));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/98af3dc4/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 84b3300..073d674 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -14,6 +14,7 @@
 # limitations under the License.
 org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
 org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
+org.apache.nifi.processors.azure.eventhub.ConsumeAzureEventHub
 org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage
 org.apache.nifi.processors.azure.storage.ListAzureBlobStorage
 org.apache.nifi.processors.azure.storage.PutAzureBlobStorage
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/98af3dc4/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
new file mode 100644
index 0000000..1f54f39
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.eventhub;
+
+import com.microsoft.azure.eventhubs.EventData;
+import com.microsoft.azure.eventprocessorhost.PartitionContext;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.SharedSessionState;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestConsumeAzureEventHub {
+
+    private ConsumeAzureEventHub.EventProcessor eventProcessor;
+    private MockProcessSession processSession;
+    private SharedSessionState sharedState;
+    private PartitionContext partitionContext;
+    private ConsumeAzureEventHub processor;
+
+    @Before
+    public void setupProcessor() {
+        processor = new ConsumeAzureEventHub();
+        final ProcessorInitializationContext initContext = Mockito.mock(ProcessorInitializationContext.class);
+        final String componentId = "componentId";
+        when(initContext.getIdentifier()).thenReturn(componentId);
+        MockComponentLog componentLog = new MockComponentLog(componentId, processor);
+        when(initContext.getLogger()).thenReturn(componentLog);
+        processor.initialize(initContext);
+
+        final ProcessSessionFactory processSessionFactory = Mockito.mock(ProcessSessionFactory.class);
+        processor.setProcessSessionFactory(processSessionFactory);
+        processor.setNamespaceName("namespace");
+
+        sharedState = new SharedSessionState(processor, new AtomicLong(0));
+        processSession = new MockProcessSession(sharedState, processor);
+        when(processSessionFactory.createSession()).thenReturn(processSession);
+
+        eventProcessor = processor.new EventProcessor();
+
+        partitionContext = Mockito.mock(PartitionContext.class);
+        when(partitionContext.getEventHubPath()).thenReturn("eventhub-name");
+        when(partitionContext.getPartitionId()).thenReturn("partition-id");
+        when(partitionContext.getConsumerGroupName()).thenReturn("consumer-group");
+    }
+
+    @Test
+    public void testReceiveOne() throws Exception {
+
+        final Iterable<EventData> eventDataList = Arrays.asList(new EventData("one".getBytes(StandardCharsets.UTF_8)));
+        eventProcessor.onEvents(partitionContext, eventDataList);
+
+        processSession.assertCommitted();
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile msg1 = flowFiles.get(0);
+        msg1.assertContentEquals("one");
+        msg1.assertAttributeEquals("eventhub.name", "eventhub-name");
+        msg1.assertAttributeEquals("eventhub.partition", "partition-id");
+
+        final List<ProvenanceEventRecord> provenanceEvents = sharedState.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType());
+        assertEquals("amqps://namespace.servicebus.windows.net/" +
+                "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri());
+    }
+
+
+    @Test
+    public void testReceiveTwo() throws Exception {
+
+        final Iterable<EventData> eventDataList = Arrays.asList(
+                new EventData("one".getBytes(StandardCharsets.UTF_8)),
+                new EventData("two".getBytes(StandardCharsets.UTF_8))
+        );
+        eventProcessor.onEvents(partitionContext, eventDataList);
+
+        processSession.assertCommitted();
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS);
+        assertEquals(2, flowFiles.size());
+        final MockFlowFile msg1 = flowFiles.get(0);
+        msg1.assertContentEquals("one");
+        final MockFlowFile msg2 = flowFiles.get(1);
+        msg2.assertContentEquals("two");
+
+        final List<ProvenanceEventRecord> provenanceEvents = sharedState.getProvenanceEvents();
+        assertEquals(2, provenanceEvents.size());
+    }
+
+    @Test
+    public void testCheckpointFailure() throws Exception {
+
+        final Iterable<EventData> eventDataList = Arrays.asList(
+                new EventData("one".getBytes(StandardCharsets.UTF_8)),
+                new EventData("two".getBytes(StandardCharsets.UTF_8))
+        );
+        doThrow(new RuntimeException("Failed to create a checkpoint.")).when(partitionContext).checkpoint();
+        eventProcessor.onEvents(partitionContext, eventDataList);
+
+        // Even if it fails to create a checkpoint, these FlowFiles are already committed.
+        processSession.assertCommitted();
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS);
+        assertEquals(2, flowFiles.size());
+        final MockFlowFile msg1 = flowFiles.get(0);
+        msg1.assertContentEquals("one");
+        final MockFlowFile msg2 = flowFiles.get(1);
+        msg2.assertContentEquals("two");
+
+        final List<ProvenanceEventRecord> provenanceEvents = sharedState.getProvenanceEvents();
+        assertEquals(2, provenanceEvents.size());
+    }
+
+    private Record toRecord(String value) {
+        Map<String, Object> map = new HashMap<>();
+        map.put("value", value);
+        return new MapRecord(new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("value", RecordFieldType.STRING.getDataType()))), map);
+    }
+
+    private void setupRecordWriter() throws SchemaNotFoundException, IOException {
+        setupRecordWriter(null);
+    }
+
+    private void setupRecordWriter(String throwErrorWith) throws SchemaNotFoundException, IOException {
+        final RecordSetWriterFactory writerFactory = mock(RecordSetWriterFactory.class);
+        processor.setWriterFactory(writerFactory);
+        final RecordSetWriter writer = mock(RecordSetWriter.class);
+        final AtomicReference<OutputStream> outRef = new AtomicReference<>();
+        when(writerFactory.createWriter(any(), any(), any())).thenAnswer(invocation -> {
+            outRef.set(invocation.getArgumentAt(2, OutputStream.class));
+            return writer;
+        });
+        when(writer.write(any(Record.class))).thenAnswer(invocation -> {
+            final String value = (String) invocation.getArgumentAt(0, Record.class).getValue("value");
+            if (throwErrorWith != null && throwErrorWith.equals(value)) {
+                throw new IOException("Simulating record write failure.");
+            }
+            outRef.get().write(value.getBytes(StandardCharsets.UTF_8));
+            return WriteResult.of(1, Collections.emptyMap());
+        });
+    }
+
+    private void setupRecordReader(List<EventData> eventDataList) throws MalformedRecordException, IOException, SchemaNotFoundException {
+        setupRecordReader(eventDataList, -1, null);
+    }
+
+    private void setupRecordReader(List<EventData> eventDataList, int throwExceptionAt, String writeFailureWith)
+            throws MalformedRecordException, IOException, SchemaNotFoundException {
+        final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class);
+        processor.setReaderFactory(readerFactory);
+        final RecordReader reader = mock(RecordReader.class);
+        when(readerFactory.createRecordReader(anyMap(), any(), any())).thenReturn(reader);
+        final List<Record> recordList = eventDataList.stream()
+                .map(eventData -> toRecord(new String(eventData.getBytes())))
+                .collect(Collectors.toList());
+
+        // Add null to indicate the end of records.
+        final Function<List<Record>, List<Record>> addEndRecord = rs -> rs.stream()
+                // If the record is simulated to throw an exception when writing, do not add a null record to avoid messing up indices.
+                .flatMap(r -> r.getAsString("value").equals(writeFailureWith) ? Stream.of(r) : Stream.of(r, null))
+                .collect(Collectors.toList());
+
+        final List<Record> recordSetList = addEndRecord.apply(recordList);
+        final Record[] records = recordSetList.toArray(new Record[recordSetList.size()]);
+
+        switch (throwExceptionAt) {
+            case -1:
+                when(reader.nextRecord())
+                        .thenReturn(records[0], Arrays.copyOfRange(records, 1, records.length));
+                break;
+            case 0:
+                when(reader.nextRecord())
+                        .thenThrow(new MalformedRecordException("Simulating Record parse failure."))
+                        .thenReturn(records[0], Arrays.copyOfRange(records, 1, records.length));
+                break;
+            default:
+                final List<Record> recordList1 = addEndRecord.apply(recordList.subList(0, throwExceptionAt));
+                final List<Record> recordList2 = addEndRecord.apply(recordList.subList(throwExceptionAt + 1, recordList.size()));
+                final Record[] records1 = recordList1.toArray(new Record[recordList1.size()]);
+                final Record[] records2 = recordList2.toArray(new Record[recordList2.size()]);
+                when(reader.nextRecord())
+                        .thenReturn(records1[0], Arrays.copyOfRange(records1, 1, records1.length))
+                        .thenThrow(new MalformedRecordException("Simulating Record parse failure."))
+                        .thenReturn(records2[0], Arrays.copyOfRange(records2, 1, records2.length));
+        }
+
+    }
+
+    @Test
+    public void testReceiveRecords() throws Exception {
+
+        final List<EventData> eventDataList = Arrays.asList(
+                new EventData("one".getBytes(StandardCharsets.UTF_8)),
+                new EventData("two".getBytes(StandardCharsets.UTF_8))
+        );
+
+        setupRecordReader(eventDataList);
+
+        setupRecordWriter();
+
+        eventProcessor.onEvents(partitionContext, eventDataList);
+
+        processSession.assertCommitted();
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile ff1 = flowFiles.get(0);
+        ff1.assertContentEquals("onetwo");
+        ff1.assertAttributeEquals("eventhub.name", "eventhub-name");
+        ff1.assertAttributeEquals("eventhub.partition", "partition-id");
+
+        final List<ProvenanceEventRecord> provenanceEvents = sharedState.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+        final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType());
+        assertEquals("amqps://namespace.servicebus.windows.net/" +
+                "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri());
+    }
+
+    @Test
+    public void testReceiveRecordReaderFailure() throws Exception {
+
+        final List<EventData> eventDataList = Arrays.asList(
+                new EventData("one".getBytes(StandardCharsets.UTF_8)),
+                new EventData("two".getBytes(StandardCharsets.UTF_8)),
+                new EventData("three".getBytes(StandardCharsets.UTF_8)),
+                new EventData("four".getBytes(StandardCharsets.UTF_8))
+        );
+
+        setupRecordReader(eventDataList, 2, null);
+
+        setupRecordWriter();
+
+        eventProcessor.onEvents(partitionContext, eventDataList);
+
+        processSession.assertCommitted();
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile ff1 = flowFiles.get(0);
+        ff1.assertContentEquals("onetwofour");
+        ff1.assertAttributeEquals("eventhub.name", "eventhub-name");
+        ff1.assertAttributeEquals("eventhub.partition", "partition-id");
+
+        final List<MockFlowFile> failedFFs = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_PARSE_FAILURE);
+        assertEquals(1, failedFFs.size());
+        final MockFlowFile failed1 = failedFFs.get(0);
+        failed1.assertContentEquals("three");
+        failed1.assertAttributeEquals("eventhub.name", "eventhub-name");
+        failed1.assertAttributeEquals("eventhub.partition", "partition-id");
+
+        final List<ProvenanceEventRecord> provenanceEvents = sharedState.getProvenanceEvents();
+        assertEquals(2, provenanceEvents.size());
+
+        final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType());
+        assertEquals("amqps://namespace.servicebus.windows.net/" +
+                "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri());
+
+        final ProvenanceEventRecord provenanceEvent2 = provenanceEvents.get(1);
+        assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent2.getEventType());
+        assertEquals("amqps://namespace.servicebus.windows.net/" +
+                "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent2.getTransitUri());
+    }
+
+    @Test
+    public void testReceiveAllRecordFailure() throws Exception {
+
+        final List<EventData> eventDataList = Collections.singletonList(
+                new EventData("one".getBytes(StandardCharsets.UTF_8))
+        );
+
+        setupRecordReader(eventDataList, 0, null);
+
+        setupRecordWriter();
+
+        eventProcessor.onEvents(partitionContext, eventDataList);
+
+        processSession.assertCommitted();
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS);
+        assertEquals(0, flowFiles.size());
+
+        final List<MockFlowFile> failedFFs = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_PARSE_FAILURE);
+        assertEquals(1, failedFFs.size());
+        final MockFlowFile failed1 = failedFFs.get(0);
+        failed1.assertContentEquals("one");
+        failed1.assertAttributeEquals("eventhub.name", "eventhub-name");
+        failed1.assertAttributeEquals("eventhub.partition", "partition-id");
+
+        final List<ProvenanceEventRecord> provenanceEvents = sharedState.getProvenanceEvents();
+        assertEquals(1, provenanceEvents.size());
+
+        final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType());
+        assertEquals("amqps://namespace.servicebus.windows.net/" +
+                "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri());
+
+    }
+
+    @Test
+    public void testReceiveRecordWriterFailure() throws Exception {
+
+        final List<EventData> eventDataList = Arrays.asList(
+                new EventData("one".getBytes(StandardCharsets.UTF_8)),
+                new EventData("two".getBytes(StandardCharsets.UTF_8)),
+                new EventData("three".getBytes(StandardCharsets.UTF_8)),
+                new EventData("four".getBytes(StandardCharsets.UTF_8))
+        );
+
+        setupRecordReader(eventDataList, -1, "two");
+
+        setupRecordWriter("two");
+
+        eventProcessor.onEvents(partitionContext, eventDataList);
+
+        processSession.assertCommitted();
+        final List<MockFlowFile> flowFiles = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile ff1 = flowFiles.get(0);
+        ff1.assertContentEquals("onethreefour");
+        ff1.assertAttributeEquals("eventhub.name", "eventhub-name");
+        ff1.assertAttributeEquals("eventhub.partition", "partition-id");
+
+        final List<MockFlowFile> failedFFs = processSession.getFlowFilesForRelationship(ConsumeAzureEventHub.REL_PARSE_FAILURE);
+        assertEquals(1, failedFFs.size());
+        final MockFlowFile failed1 = failedFFs.get(0);
+        failed1.assertContentEquals("two");
+        failed1.assertAttributeEquals("eventhub.name", "eventhub-name");
+        failed1.assertAttributeEquals("eventhub.partition", "partition-id");
+
+        final List<ProvenanceEventRecord> provenanceEvents = sharedState.getProvenanceEvents();
+        assertEquals(2, provenanceEvents.size());
+
+        final ProvenanceEventRecord provenanceEvent1 = provenanceEvents.get(0);
+        assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent1.getEventType());
+        assertEquals("amqps://namespace.servicebus.windows.net/" +
+                "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent1.getTransitUri());
+
+        final ProvenanceEventRecord provenanceEvent2 = provenanceEvents.get(1);
+        assertEquals(ProvenanceEventType.RECEIVE, provenanceEvent2.getEventType());
+        assertEquals("amqps://namespace.servicebus.windows.net/" +
+                "eventhub-name/ConsumerGroups/consumer-group/Partitions/partition-id", provenanceEvent2.getTransitUri());
+    }
+
+}