You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2020/10/26 10:15:32 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ed1a1c  MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP
7ed1a1c is described below

commit 7ed1a1cdb94898085b56c0db2366e3dd23451124
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Mon Oct 26 10:59:16 2020 +0100

    MINIFICPP-1388 Introduce buffer for HTTP requests in ListenHTTP
    
    This closes #921
    
    Signed-off-by: Marton Szasz <sz...@gmail.com>
---
 PROCESSORS.md                                      | 316 +++++++++++----------
 extensions/civetweb/processors/ListenHTTP.cpp      | 236 ++++++++-------
 extensions/civetweb/processors/ListenHTTP.h        |  81 +++---
 extensions/civetweb/tests/ListenHTTPTests.cpp      | 224 +++++++++++----
 .../http-curl/tests/HttpPostIntegrationTest.cpp    |   2 +-
 libminifi/src/io/FileStream.cpp                    |   7 +-
 6 files changed, 511 insertions(+), 355 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 3aa7222..c9483d2 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -45,15 +45,15 @@
 - [UpdateAttribute](#updateattribute)
 ## AppendHostInfo
 
-### Description 
+### Description
 
 Appends host information such as IP address and hostname as an attribute to incoming flowfiles.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Hostname Attribute|source.hostname||Flowfile attribute to used to record the agent's hostname|
 |IP Attribute|source.ipv4||Flowfile attribute to used to record the agent's IP address|
 |Network Interface Name|eth0||Network interface from which to read an IP v4 address|
@@ -66,15 +66,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ApplyTemplate
 
-### Description 
+### Description
 
 Applies the mustache template specified by the "Template" property and writes the output to the flow file content. FlowFile attributes are used as template parameters.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Template|||Path to the input mustache template file|
 ### Relationships
 
@@ -85,15 +85,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## BinFiles
 
-### Description 
+### Description
 
 Bins flow files into buckets based on the number of entries or size of entries
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Max Bin Age|||The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit>|
 |Maximum Group Size|||The maximum size for the bundle. If not specified, there is no maximum.|
 |Maximum Number of Entries|||The maximum number of files to include in a bundle. If not specified, there is no maximum.|
@@ -110,15 +110,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## CapturePacket
 
-### Description 
+### Description
 
 CapturePacket captures and writes one or more packets into a PCAP file that will be used as the content of a flow file. Configuration options exist to adjust the batching of PCAP files. PCAP batching will place a single PCAP into a flow file. A regular expression selects network interfaces. Bluetooth network interfaces can be selected through a separate option.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Base Directory|/tmp/||Scratch directory for PCAP files|
 |Batch Size|50||The number of packets to combine within a given PCAP|
 |Capture Bluetooth|false||True indicates that we support bluetooth interfaces|
@@ -132,15 +132,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## CaptureRTSPFrame
 
-### Description 
+### Description
 
 Captures a frame from the RTSP stream at specified intervals.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Image Encoding|.jpg||The encoding that should be applied the the frame images captured from the RTSP stream|
 |RTSP Hostname|||Hostname of the RTSP stream we are trying to connect to|
 |RTSP Password|||Password used to connect to the RTSP stream|
@@ -157,15 +157,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## CompressContent
 
-### Description 
+### Description
 
 Compresses or decompresses the contents of FlowFiles using a user-specified compression algorithm and updates the mime.type attribute as appropriate
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Compression Format|use mime.type attribute||The compression format to use.|
 |Compression Level|1||The compression level to use; this is valid only when using GZIP compression.|
 |Mode|compress||Indicates whether the processor should compress content or decompress content.|
@@ -180,15 +180,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ConsumeMQTT
 
-### Description 
+### Description
 
 This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. The the payload of the MQTT message becomes content of a FlowFile
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Broker URI|||The URI to use to connect to the MQTT broker|
 |Client ID|||MQTT client ID to use|
 |Connection Timeout|30 sec||Maximum time interval the client will wait for the network connection to the MQTT server|
@@ -209,15 +209,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ExecuteProcess
 
-### Description 
+### Description
 
 Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected to be long-running,the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format,as it typically does not make sense to split binary data on arbitrary time-based intervals.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Batch Duration|0 sec||If the process is expected to be long-running and produce textual output, a batch duration can be specified.|
 |Command|||Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.<br/>**Supports Expression Language: true**|
 |Command Arguments|||The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.<br/>**Supports Expression Language: true**|
@@ -232,16 +232,16 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ExecutePythonProcessor
 
-### Description 
+### Description
 
 Executes a script given the flow file and a process session. The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back. Scripts must define an onTrigger function which accepts NiFi Context and Property objects. For efficiency, scripts are executed once when the processor is run, then the onTrigger method is called for each  [...]
 
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Script File|||Path to script file to execute. Only one of Script File or Script Body may be used|
 |Script Body|||Script to execute|
 ### Relationships
@@ -254,15 +254,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ExecuteSQL
 
-### Description 
+### Description
 
 Execute provided SQL query. Query result rows will be outputted as new flow files with attribute keys equal to result column names and values equal to result values. There will be one output FlowFile per result row. This processor can be scheduled to run using the standard timer-based scheduling methods, or it can be triggered by an incoming FlowFile. If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the query.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Connection URL|||The database URL to connect to|
 |SQL Statement|||The SQL statement to execute|
 ### Relationships
@@ -276,15 +276,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ExecuteScript
 
-### Description 
+### Description
 
 Executes a script given the flow file and a process session. The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back. Scripts must define an onTrigger function which accepts NiFi Context and Property objects. For efficiency, scripts are executed once when the processor is run, then the onTrigger method is called for each  [...]
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Module Directory|||Comma-separated list of paths to files and/or directories which contain modules required by the script|
 |Script Body|||Body of script to execute. Only one of Script File or Script Body may be used|
 |Script Engine|python||The engine to execute scripts (python, lua)|
@@ -300,15 +300,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ExtractText
 
-### Description 
+### Description
 
 Extracts the content of a FlowFile and places it into an attribute.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Attribute|||Attribute to set from content|
 |Enable Case-insensitive Matching|false||Indicates that two characters match even if they are in a different case. |
 |Enable repeating capture group|false||f set to true, every string matching the capture groups will be extracted. Otherwise, if the Regular Expression matches more than once, only the first match will be extracted.|
@@ -325,15 +325,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## FetchOPCProcessor
 
-### Description 
+### Description
 
 Fetches OPC-UA node
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Application URI|||Application URI of the client in the format 'urn:unconfigured:application'. Mandatory, if using Secure Channel and must match the URI included in the certificate's Subject Alternative Names.|
 |Certificate path|||Path to the DER-encoded cert file|
 |Key path|||Path to the DER-encoded key file|
@@ -356,15 +356,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## FetchSFTP
 
-### Description 
+### Description
 
 Fetches the content of a file from a remote SFTP server and overwrites the contents of an incoming FlowFile with the content of the remote file.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |**Completion Strategy**|None|Delete File<br>Move File<br>None<br>|Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be logged but the data will still be transferred.|
 |**Connection Timeout**|30 sec||Amount of time to wait before timing out while creating a connection|
 |**Create Directory**|false||Specifies whether or not the remote directory should be created if it does not exist.|
@@ -399,15 +399,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## FocusArchiveEntry
 
-### Description 
+### Description
 
 Allows manipulation of entries within an archive (e.g. TAR) by focusing on one entry within the archive at a time. When an archive entry is focused, that entry is treated as the content of the FlowFile and may be manipulated independently of the rest of the archive. To restore the FlowFile to its original state, use UnfocusArchiveEntry.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Path|||The path within the archive to focus ("/" to focus the total archive)|
 ### Relationships
 
@@ -418,15 +418,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## GenerateFlowFile
 
-### Description 
+### Description
 
 This processor creates FlowFiles with random data or custom content. GenerateFlowFile is useful for load testing, configuration, and simulation.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Batch Size|1||The number of FlowFiles to be transferred in each invocation|
 |Data Format|Binary|Text<br>Binary<br>|Specifies whether the data should be Text or Binary|
 |File Size|1 kB||The size of the file that will be used|
@@ -440,15 +440,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## GetFile
 
-### Description 
+### Description
 
 Creates FlowFiles from files in a directory. MiNiFi will ignore files for which it doesn't have read permissions.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Batch Size|10||The maximum number of files to pull in each iteration|
 |File Filter|[^\.].*||Only files whose names match the given regular expression will be picked up|
 |Ignore Hidden Files|true||Indicates whether or not hidden files should be ignored|
@@ -469,15 +469,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## GetGPS
 
-### Description 
+### Description
 
 Obtains GPS coordinates from the GPSDHost and port.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |GPSD Host|localhost||The host running the GPSD daemon|
 |GPSD Port|2947||The GPSD daemon port|
 |GPSD Wait Time|50000000||Timeout value for waiting for data from the GPSD instance|
@@ -490,15 +490,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## GetTCP
 
-### Description 
+### Description
 
 Establishes a TCP Server that defines and retrieves one or more byte messages from clients
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |SSL Context Service|||SSL Context Service Name|
 |Stay Connected|true||Determines if we keep the same socket despite having no data|
 |concurrent-handler-count|1||Number of concurrent handlers for this session|
@@ -516,15 +516,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## GetUSBCamera
 
-### Description 
+### Description
 
 Gets images from USB Video Class (UVC)-compatible devices. Outputs one flow file per frame at the rate specified by the FPS property in the format specified by the Format property.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |FPS|1||Frames per second to capture from USB camera|
 |Format|PNG||Frame format (currently only PNG and RAW are supported; RAW is a binary pixel buffer of RGB values)|
 |Height|||Target height of image to capture from USB camera|
@@ -542,18 +542,18 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## HashContent
 
-### Description 
+### Description
 
 HashContent calculates the checksum of the content of the flowfile and adds it as an attribute. Configuration options exist to select hashing algorithm and set the name of the attribute.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Hash Algorithm|SHA256||Name of the algorithm used to generate checksum|
 |Hash Attribute|Checksum||Attribute to store checksum to|
-### Properties 
+### Properties
 
 | Name | Description |
 | - | - |
@@ -563,15 +563,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## InvokeHTTP
 
-### Description 
+### Description
 
 An HTTP client processor which can interact with a configurable HTTP Endpoint. The destination URL and HTTP Method are configurable. FlowFile attributes are converted to HTTP headers and the FlowFile contents are included as the body of the request (if the HTTP Method is PUT, POST or PATCH).
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Always Output Response|false||Will force a response FlowFile to be generated and routed to the 'Response' relationship regardless of what the server status code received is |
 |Attributes to Send|||Regular expression that defines which attributes to send as HTTP headers in the request. If not defined, no attributes are sent as headers.|
 |Connection Timeout|5 secs||Max wait time for connection to remote service.|
@@ -597,15 +597,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ListSFTP
 
-### Description 
+### Description
 
 Performs a listing of the files residing on an SFTP server. For each file that is found on the remote server, a new FlowFile will be created with the filename attribute set to the name of the file on the remote server. This can then be used in conjunction with FetchSFTP in order to fetch those files.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |**Connection Timeout**|30 sec||Amount of time to wait before timing out while creating a connection|
 |**Data Timeout**|30 sec||When transferring a file between the local and remote system, this value specifies how long is allowed to elapse without any data being transferred between systems|
 |Entity Tracking Initial Listing Target|All Available|All Available<br>Tracking Time Window<br>|Specify how initial listing should be handled. Used by 'Tracking Entities' strategy.|
@@ -646,17 +646,19 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ListenHTTP
 
-### Description 
+### Description
 
 Starts an HTTP Server and listens on a given base path to transform incoming requests into FlowFiles. The default URI of the Service will be http://{hostname}:{port}/contentListener. Only HEAD, POST, and GET requests are supported. PUT, and DELETE will result in an error and the HTTP response status code 405. The response body text for all requests, by default, is empty (length of 0). A static response body can be set for a given URI by sending input files to ListenHTTP with the http.typ [...]
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Authorized DN Pattern|.*||A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.|
 |Base Path|contentListener||Base path for incoming connections|
+|Batch Size|0||Maximum number of buffered requests to be processed in a single batch. If set to zero all buffered requests are processed.|
+|Buffer Size|0||Maximum number of HTTP Requests allowed to be buffered before processing them when the processor is triggered. If the buffer full, the request is refused. If set to zero the buffer is unlimited.|
 |HTTP Headers to receive as Attributes (Regex)|||Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes|
 |**Listening Port**|80||The Port to listen on for incoming connections. 0 means port is going to be selected randomly.|
 |SSL Certificate|||File containing PEM-formatted file including TLS/SSL certificate and key|
@@ -672,15 +674,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ListenSyslog
 
-### Description 
+### Description
 
 Listens for Syslog messages being sent to a given port over TCP or UDP. Incoming messages are checked against regular expressions for RFC5424 and RFC3164 formatted messages. The format of each message is: (<PRIORITY>)(VERSION )(TIMESTAMP) (HOSTNAME) (BODY) where version is optional. The timestamp can be an RFC5424 timestamp with a format of "yyyy-MM-dd'T'HH:mm:ss.SZ" or "yyyy-MM-dd'T'HH:mm:ss.S+hh:mm", or it can be an RFC3164 timestamp with a format of "MMM d HH:mm:ss". If an incoming me [...]
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Max Batch Size|1||The maximum number of Syslog events to add to a single FlowFile.|
 |Max Number of TCP Connections|2||The maximum number of concurrent connections to accept Syslog messages in TCP mode.|
 |Max Size of Socket Buffer|1 MB||The maximum size of the socket buffer that should be used.|
@@ -699,15 +701,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## LogAttribute
 
-### Description 
+### Description
 
 Logs attributes of flow files in the MiNiFi application log.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Attributes to Ignore|||A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.|
 |Attributes to Log|||A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.|
 |FlowFiles To Log|1||Number of flow files to log. If set to zero all flow files will be logged. Please note that this may block other threads from running if not used judiciously.|
@@ -725,15 +727,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## ManipulateArchive
 
-### Description 
+### Description
 
 Performs an operation which manipulates an archive without needing to split the archive into multiple FlowFiles.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |After|||For operations which result in new entries, places the new entry after the entry specified by this property.|
 |Before|||For operations which result in new entries, places the new entry before the entry specified by this property.|
 |Destination|||Destination for operations (touch, move or copy) which result in new entries.|
@@ -749,16 +751,16 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## MergeContent
 
-### Description 
+### Description
 
 Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. MergeContent should be configured with only one incoming connection as it won't create grouped Flow Files.This processor updates the mime.type attribute as appropriate.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
+| Name | Default Value | Allowable Values | Description |
 | - | - | - | - |
-|Attribute Strategy|Keep Only Common Attributes|Keep Only Common Attributes<Br>Keep All Unique Attributes<Br>|Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile (in which case neither, or none, of the conflicting attributes will be kept). If 'Keep Only Common Attributes' is selected, only the attributes  [...]
+|Attribute Strategy|Keep Only Common Attributes|Keep Only Common Attributes<Br>Keep All Unique Attributes<Br>|Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile (in which case neither, or none, of the conflicting attributes will be kept). If 'Keep Only Common Attributes' is selected, only the attributes  [...]
 |Correlation Attribute Name|||Correlation Attribute Name|
 |Delimiter Strategy|Filename||Determines if Header, Footer, and Demarcator should point to files|
 |Demarcator File|||Filename specifying the demarcator to use|
@@ -784,15 +786,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## MotionDetector
 
-### Description 
+### Description
 
 Detect motion from captured images.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |**Dilate iteration**|10||For image processing, if an object is detected as 2 separate objects, increase this value|
 |**Image Encoding**|.jpg|.jpg<br>.png<br>|The encoding that should be applied to the output|
 |**Minimum Area**|100||We only consider the movement regions with area greater than this.|
@@ -808,15 +810,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## PublishKafka
 
-### Description 
+### Description
 
 This Processor puts the contents of a FlowFile to a Topic in Apache Kafka. The content of a FlowFile becomes the contents of a Kafka message. This message is optionally assigned a key by using the <Kafka Key> Property.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Attributes to Send as Headers|||Any attribute whose name matches the regex will be added to the Kafka messages as a Header|
 |Batch Size|10||Maximum number of messages batched in one MessageSet|
 |**Client Name**|||Client Name to use when communicating with Kafka<br/>**Supports Expression Language: true**|
@@ -853,15 +855,15 @@ Supports Expression Language: true (will be evaluated using flow file attributes
 
 ## PublishMQTT
 
-### Description 
+### Description
 
 PublishMQTT serializes FlowFile content as an MQTT payload, sending the message to the configured topic and broker.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Broker URI|||The URI to use to connect to the MQTT broker|
 |Client ID|||MQTT client ID to use|
 |Connection Timeout|30 sec||Maximum time interval the client will wait for the network connection to the MQTT server|
@@ -883,15 +885,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## PutFile
 
-### Description 
+### Description
 
 Writes the contents of a FlowFile to the local file system
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Conflict Resolution Strategy|fail|fail<br>ignore<br>replace<br>|Indicates what should happen when a file with the same name already exists in the output directory|
 |**Create Missing Directories**|true||If true, then missing destination directories will be created. If false, flowfiles are penalized and sent to failure.|
 |Directory|.||The output directory to which to put files<br/>**Supports Expression Language: true**|
@@ -906,15 +908,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## PutOPCProcessor
 
-### Description 
+### Description
 
 Creates/updates  OPC nodes
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Application URI|||Application URI of the client in the format 'urn:unconfigured:application'. Mandatory, if using Secure Channel and must match the URI included in the certificate's Subject Alternative Names.|
 |Certificate path|||Path to the DER-encoded cert file|
 |Key path|||Path to the DER-encoded key file|
@@ -940,15 +942,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## PutSFTP
 
-### Description 
+### Description
 
 Sends FlowFiles to an SFTP Server
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |**Batch Size**|500||The maximum number of FlowFiles to send in a single connection|
 |**Conflict Resolution**|NONE|FAIL<br>IGNORE<br>NONE<br>REJECT<br>RENAME<br>REPLACE<br>|Determines how to handle the problem of filename collisions|
 |**Connection Timeout**|30 sec||Amount of time to wait before timing out while creating a connection|
@@ -989,15 +991,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## PutSQL
 
-### Description 
+### Description
 
 Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command to execute. The SQL command may use the ? character to bind parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The content of the FlowFile is expected to be in UTF-8 format.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |Batch Size|1||The maximum number of flow files to process in one batch|
 |Connection URL|||The database URL to connect to|
 |SQL Statement|||The SQL statement to execute|
@@ -1048,15 +1050,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## RouteOnAttribute
 
-### Description 
+### Description
 
 Routes FlowFiles based on their Attributes using the Attribute Expression Language.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 ### Relationships
 
 | Name | Description |
@@ -1067,15 +1069,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## TailFile
 
-### Description 
+### Description
 
 "Tails" a file, or a list of files, ingesting data from the file as it is written to the file. The file is expected to be textual. Data is ingested only when a new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically "rolled over", as is generally the case with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover occurred while NiFi was not running (pr [...]
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 |File to Tail|||Fully-qualified filename of the file that should be tailed when using single file mode, or a file regex when using multifile mode|
 |Input Delimiter|||Specifies the character that should be used for delimiting the data being tailedfrom the incoming file.If none is specified, data will be ingested as it becomes available.|
 |State File|TailFileState||Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off|
@@ -1090,15 +1092,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## UnfocusArchiveEntry
 
-### Description 
+### Description
 
 Restores a FlowFile which has had an archive entry focused via FocusArchiveEntry to its original state.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 ### Relationships
 
 | Name | Description |
@@ -1108,15 +1110,15 @@ In the list below, the names of required properties appear in bold. Any other pr
 
 ## UpdateAttribute
 
-### Description 
+### Description
 
 This processor updates the attributes of a FlowFile using properties that are added by the user. This allows you to set default attribute changes that affect every FlowFile going through the processor, equivalent to the "basic" usage in Apache NiFi.
-### Properties 
+### Properties
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
 
-| Name | Default Value | Allowable Values | Description | 
-| - | - | - | - | 
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
 ### Relationships
 
 | Name | Description |
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp
index ade9501..19befa4 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -26,6 +26,8 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
+const uint64_t ListenHTTP::DEFAULT_BUFFER_SIZE = 20000;
+
 core::Property ListenHTTP::BasePath(
     core::PropertyBuilder::createProperty("Base Path")
         ->withDescription("Base path for incoming connections")
@@ -53,7 +55,7 @@ core::Property ListenHTTP::SSLVerifyPeer(
 
 core::Property ListenHTTP::SSLMinimumVersion(
     core::PropertyBuilder::createProperty("SSL Minimum Version")
-        -> withDescription("Minimum TLS/SSL version allowed (TLS1.2)")
+        ->withDescription("Minimum TLS/SSL version allowed (TLS1.2)")
         ->isRequired(false)
         ->withAllowableValues<std::string>({"TLS1.2"})
         ->withDefaultValue("TLS1.2")->build());
@@ -62,6 +64,17 @@ core::Property ListenHTTP::HeadersAsAttributesRegex("HTTP Headers to receive as
                                                     " should be passed along as FlowFile attributes",
                                                     "");
 
+core::Property ListenHTTP::BatchSize(
+    core::PropertyBuilder::createProperty("Batch Size")
+        ->withDescription("Maximum number of buffered requests to be processed in a single batch. If set to zero all buffered requests are processed.")
+        ->withDefaultValue<uint64_t>(ListenHTTP::DEFAULT_BUFFER_SIZE)->build());
+
+core::Property ListenHTTP::BufferSize(
+    core::PropertyBuilder::createProperty("Buffer Size")
+        ->withDescription("Maximum number of HTTP Requests allowed to be buffered before processing them when the processor is triggered. "
+                          "If the buffer full, the request is refused. If set to zero the buffer is unlimited.")
+        ->withDefaultValue<uint64_t>(ListenHTTP::DEFAULT_BUFFER_SIZE)->build());
+
 core::Relationship ListenHTTP::Success("success", "All files are routed to success");
 
 void ListenHTTP::initialize() {
@@ -77,6 +90,8 @@ void ListenHTTP::initialize() {
   properties.insert(SSLVerifyPeer);
   properties.insert(SSLMinimumVersion);
   properties.insert(HeadersAsAttributesRegex);
+  properties.insert(BatchSize);
+  properties.insert(BufferSize);
   setSupportedProperties(properties);
   // Set the supported relationships
   std::set<core::Relationship> relationships;
@@ -191,7 +206,11 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF
   }
 
   server_.reset(new CivetServer(options, &callbacks_, &logger_));
-  handler_.reset(new Handler(basePath, context, sessionFactory, std::move(authDNPattern), std::move(headersAsAttributesPattern)));
+
+  context->getProperty(BatchSize.getName(), batch_size_);
+  logger_->log_debug("ListenHTTP using %s: %zu", BatchSize.getName(), batch_size_);
+
+  handler_.reset(new Handler(basePath, context, std::move(authDNPattern), std::move(headersAsAttributesPattern)));
   server_->addHandler(basePath, handler_.get());
 
   if (randomPort) {
@@ -212,9 +231,13 @@ void ListenHTTP::onSchedule(core::ProcessContext *context, core::ProcessSessionF
 ListenHTTP::~ListenHTTP() = default;
 
 void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *session) {
-  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  logger_->log_debug("OnTrigger ListenHTTP");
+  processIncomingFlowFile(session);
+  processRequestBuffer(session);
+}
 
-  // Do nothing if there are no incoming files
+void ListenHTTP::processIncomingFlowFile(core::ProcessSession *session) {
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
   if (!flow_file) {
     return;
   }
@@ -222,41 +245,67 @@ void ListenHTTP::onTrigger(core::ProcessContext *context, core::ProcessSession *
   std::string type;
   flow_file->getAttribute("http.type", type);
 
-  if (type == "response_body") {
-
-    if (handler_) {
-      struct response_body response { "", "", "" };
-      ResponseBodyReadCallback cb(&response.body);
-      flow_file->getAttribute("filename", response.uri);
-      flow_file->getAttribute("mime.type", response.mime_type);
-      if (response.mime_type.empty()) {
-        logger_->log_warn("Using default mime type of application/octet-stream for response body file: %s", response.uri);
-        response.mime_type = "application/octet-stream";
-      }
-      session->read(flow_file, &cb);
-      handler_->set_response_body(std::move(response));
+  if (type == "response_body" && handler_) {
+    ResponseBody response;
+    ResponseBodyReadCallback cb(&response.body);
+    flow_file->getAttribute("filename", response.uri);
+    flow_file->getAttribute("mime.type", response.mime_type);
+    if (response.mime_type.empty()) {
+      logger_->log_warn("Using default mime type of application/octet-stream for response body file: %s", response.uri);
+      response.mime_type = "application/octet-stream";
     }
+    session->read(flow_file, &cb);
+    handler_->setResponseBody(std::move(response));
   }
 
   session->remove(flow_file);
 }
 
-ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext *context, core::ProcessSessionFactory *session_factory, std::string &&auth_dn_regex, std::string &&header_as_attrs_regex)
+void ListenHTTP::processRequestBuffer(core::ProcessSession *session) {
+  std::size_t flow_file_count = 0;
+  for (; batch_size_ == 0 || batch_size_ > flow_file_count; ++flow_file_count) {
+    FlowFileBufferPair flow_file_buffer_pair;
+    if (!handler_->dequeueRequest(flow_file_buffer_pair)) {
+      break;
+    }
+
+    auto flow_file = flow_file_buffer_pair.first;
+    session->add(flow_file);
+
+    if (flow_file_buffer_pair.second) {
+      WriteCallback callback(std::move(flow_file_buffer_pair.second));
+      session->write(flow_file, &callback);
+    }
+
+    session->transfer(flow_file, Success);
+  }
+
+  logger_->log_debug("ListenHTTP transferred %zu flow files from HTTP request buffer", flow_file_count);
+}
+
+ListenHTTP::Handler::Handler(std::string base_uri, core::ProcessContext *context, std::string &&auth_dn_regex, std::string &&header_as_attrs_regex)
     : base_uri_(std::move(base_uri)),
       auth_dn_regex_(std::move(auth_dn_regex)),
       headers_as_attrs_regex_(std::move(header_as_attrs_regex)),
+      process_context_(context),
       logger_(logging::LoggerFactory<ListenHTTP::Handler>::getLogger()) {
-  process_context_ = context;
-  session_factory_ = session_factory;
+  context->getProperty(BufferSize.getName(), buffer_size_);
+  logger_->log_debug("ListenHTTP using %s: %zu", BufferSize.getName(), buffer_size_);
 }
 
-void ListenHTTP::Handler::send_error_response(struct mg_connection *conn) {
+void ListenHTTP::Handler::sendHttp500(mg_connection* const conn) {
   mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"
-            "Content-Type: text/html\r\n"
-            "Content-Length: 0\r\n\r\n");
+                  "Content-Type: text/html\r\n"
+                  "Content-Length: 0\r\n\r\n");
+}
+
+void ListenHTTP::Handler::sendHttp503(mg_connection* const conn) {
+  mg_printf(conn, "HTTP/1.1 503 Service Unavailable\r\n"
+                  "Content-Type: text/html\r\n"
+                  "Content-Length: 0\r\n\r\n");
 }
 
-void ListenHTTP::Handler::set_header_attributes(const mg_request_info *req_info, const std::shared_ptr<core::FlowFile> &flow_file) const {
+void ListenHTTP::Handler::setHeaderAttributes(const mg_request_info *req_info, const std::shared_ptr<core::FlowFile> &flow_file) const {
   // Add filename from "filename" header value (and pattern headers)
   for (int i = 0; i < req_info->num_headers; i++) {
     auto header = &req_info->http_headers[i];
@@ -273,6 +322,27 @@ void ListenHTTP::Handler::set_header_attributes(const mg_request_info *req_info,
   }
 }
 
+void ListenHTTP::Handler::enqueueRequest(mg_connection *conn, const mg_request_info *req_info, std::unique_ptr<io::BufferStream> content_buffer) {
+  auto flow_file = std::make_shared<FlowFileRecord>();
+  auto flow_version = process_context_->getProcessorNode()->getFlowIdentifier();
+  if (flow_version != nullptr) {
+    flow_file->setAttribute(core::SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
+  }
+
+  setHeaderAttributes(req_info, flow_file);
+
+  if (buffer_size_ == 0 || request_buffer_.size() < buffer_size_) {
+    request_buffer_.enqueue(std::make_pair(std::move(flow_file), std::move(content_buffer)));
+  } else {
+    logger_->log_warn("ListenHTTP buffer is full, '%s' request for '%s' uri was dropped", req_info->request_method, req_info->request_uri);
+    sendHttp503(conn);
+    return;
+  }
+
+  mg_printf(conn, "HTTP/1.1 200 OK\r\n");
+  writeBody(conn, req_info);
+}
+
 bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *conn) {
   auto req_info = mg_get_request_info(conn);
   if (!req_info) {
@@ -281,47 +351,18 @@ bool ListenHTTP::Handler::handlePost(CivetServer *server, struct mg_connection *
   }
   logger_->log_debug("ListenHTTP handling POST request of length %lld", req_info->content_length);
 
-  if (!auth_request(conn, req_info)) {
+  if (!authRequest(conn, req_info)) {
     return true;
   }
 
   // Always send 100 Continue, as allowed per standard to minimize client delay (https://www.w3.org/Protocols/rfc2616/rfc2616-sec8.html)
   mg_printf(conn, "HTTP/1.1 100 Continue\r\n\r\n");
 
-  auto session = session_factory_->createSession();
-  ListenHTTP::WriteCallback callback(conn, req_info);
-  auto flow_file = session->create();
-
-  if (!flow_file) {
-    send_error_response(conn);
-    return true;
-  }
-
-  try {
-    session->write(flow_file, &callback);
-    set_header_attributes(req_info, flow_file);
-
-    session->transfer(flow_file, Success);
-    session->commit();
-  } catch (std::exception &exception) {
-    logger_->log_error("ListenHTTP Caught Exception %s", exception.what());
-    send_error_response(conn);
-    session->rollback();
-    throw;
-  } catch (...) {
-    logger_->log_error("ListenHTTP Caught Exception Processor::onTrigger");
-    send_error_response(conn);
-    session->rollback();
-    throw;
-  }
-
-  mg_printf(conn, "HTTP/1.1 200 OK\r\n");
-  write_body(conn, req_info);
-
+  enqueueRequest(conn, req_info, createContentBuffer(conn, req_info));
   return true;
 }
 
-bool ListenHTTP::Handler::auth_request(mg_connection *conn, const mg_request_info *req_info) const {
+bool ListenHTTP::Handler::authRequest(mg_connection *conn, const mg_request_info *req_info) const {
   // If this is a two-way TLS connection, authorize the peer against the configured pattern
   bool authorized = true;
   if (req_info->is_ssl && req_info->client_cert != nullptr) {
@@ -344,38 +385,11 @@ bool ListenHTTP::Handler::handleGet(CivetServer *server, struct mg_connection *c
   }
   logger_->log_debug("ListenHTTP handling GET request of URI %s", req_info->request_uri);
 
-  if (!auth_request(conn, req_info)) {
+  if (!authRequest(conn, req_info)) {
     return true;
   }
 
-  auto session = session_factory_->createSession();
-  auto flow_file = session->create();
-
-  if (!flow_file) {
-    send_error_response(conn);
-    return true;
-  }
-
-  try {
-    set_header_attributes(req_info, flow_file);
-
-    session->transfer(flow_file, Success);
-    session->commit();
-  } catch (std::exception &exception) {
-    logger_->log_error("ListenHTTP Caught Exception %s", exception.what());
-    send_error_response(conn);
-    session->rollback();
-    throw;
-  } catch (...) {
-    logger_->log_error("ListenHTTP Caught Exception Processor::onTrigger");
-    send_error_response(conn);
-    session->rollback();
-    throw;
-  }
-
-  mg_printf(conn, "HTTP/1.1 200 OK\r\n");
-  write_body(conn, req_info);
-
+  enqueueRequest(conn, req_info, nullptr);
   return true;
 }
 
@@ -387,21 +401,40 @@ bool ListenHTTP::Handler::handleHead(CivetServer *server, struct mg_connection *
   }
   logger_->log_debug("ListenHTTP handling HEAD request of URI %s", req_info->request_uri);
 
-  if (!auth_request(conn, req_info)) {
+  if (!authRequest(conn, req_info)) {
     return true;
   }
 
   mg_printf(conn, "HTTP/1.1 200 OK\r\n");
-  write_body(conn, req_info, false /*include_payload*/);
+  writeBody(conn, req_info, false /*include_payload*/);
 
   return true;
 }
 
-void ListenHTTP::Handler::write_body(mg_connection *conn, const mg_request_info *req_info, bool include_payload /*=true*/) {
+void ListenHTTP::Handler::setResponseBody(const ResponseBody& response) {
+  std::lock_guard<std::mutex> guard(uri_map_mutex_);
+
+  if (response.body.empty()) {
+    logger_->log_info("Unregistering response body for URI '%s'",
+                      response.uri);
+    response_uri_map_.erase(response.uri);
+  } else {
+    logger_->log_info("Registering response body for URI '%s' of length %lu",
+                      response.uri,
+                      response.body.size());
+    response_uri_map_[response.uri] = std::move(response);
+  }
+}
+
+bool ListenHTTP::Handler::dequeueRequest(FlowFileBufferPair &flow_file_buffer_pair) {
+  return request_buffer_.tryDequeue(flow_file_buffer_pair);
+}
+
+void ListenHTTP::Handler::writeBody(mg_connection *conn, const mg_request_info *req_info, bool include_payload /*=true*/) {
   const auto &request_uri_str = std::string(req_info->request_uri);
 
   if (request_uri_str.size() > base_uri_.size() + 1) {
-    struct response_body response { };
+    ResponseBody response;
 
     {
       // Attempt to minimize time holding mutex (it would be nice to have a lock-free concurrent map here)
@@ -434,16 +467,11 @@ void ListenHTTP::Handler::write_body(mg_connection *conn, const mg_request_info
   }
 }
 
-ListenHTTP::WriteCallback::WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo)
-    : logger_(logging::LoggerFactory<ListenHTTP::WriteCallback>::getLogger()) {
-  conn_ = conn;
-  req_info_ = reqInfo;
-}
-
-int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+std::unique_ptr<io::BufferStream> ListenHTTP::Handler::createContentBuffer(struct mg_connection *conn, const struct mg_request_info *req_info) {
+  auto content_buffer = utils::make_unique<io::BufferStream>();
   int64_t rlen;
   int64_t nlen = 0;
-  int64_t tlen = req_info_->content_length;
+  int64_t tlen = req_info->content_length;
   uint8_t buf[16384];
 
   // if we have no content length we should call mg_read until
@@ -456,19 +484,27 @@ int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> strea
     }
 
     // Read a buffer of data from client
-    rlen = mg_read(conn_, &buf[0], (size_t) rlen);
+    rlen = mg_read(conn, &buf[0], (size_t) rlen);
 
     if (rlen <= 0) {
       break;
     }
 
     // Transfer buffer data to the output stream
-    stream->write(&buf[0], gsl::narrow<int>(rlen));
+    content_buffer->write(&buf[0], gsl::narrow<int>(rlen));
 
     nlen += rlen;
   }
 
-  return nlen;
+  return content_buffer;
+}
+
+ListenHTTP::WriteCallback::WriteCallback(std::unique_ptr<io::BufferStream> request_content)
+    : request_content_(std::move(request_content)) {
+}
+
+int64_t ListenHTTP::WriteCallback::process(std::shared_ptr<io::BaseStream> stream) {
+  return stream->write(const_cast<uint8_t*>(request_content_->getBuffer()), request_content_->size());
 }
 
 bool ListenHTTP::isSecure() const {
diff --git a/extensions/civetweb/processors/ListenHTTP.h b/extensions/civetweb/processors/ListenHTTP.h
index 568fb29..c774b5e 100644
--- a/extensions/civetweb/processors/ListenHTTP.h
+++ b/extensions/civetweb/processors/ListenHTTP.h
@@ -24,7 +24,6 @@
 #include <regex>
 
 #include <CivetServer.h>
-#include <concurrentqueue.h>
 
 #include "FlowFileRecord.h"
 #include "core/Processor.h"
@@ -32,6 +31,7 @@
 #include "core/Core.h"
 #include "core/Resource.h"
 #include "core/logging/LoggerConfiguration.h"
+#include "utils/MinifiConcurrentQueue.h"
 
 namespace org {
 namespace apache {
@@ -42,6 +42,7 @@ namespace processors {
 // ListenHTTP Class
 class ListenHTTP : public core::Processor {
  public:
+  using FlowFileBufferPair=std::pair<std::shared_ptr<FlowFileRecord>, std::unique_ptr<io::BufferStream>>;
 
   // Constructor
   /*!
@@ -49,9 +50,10 @@ class ListenHTTP : public core::Processor {
    */
   ListenHTTP(std::string name, utils::Identifier uuid = utils::Identifier())
       : Processor(name, uuid),
-        logger_(logging::LoggerFactory<ListenHTTP>::getLogger()) {
-    callbacks_.log_message = &log_message;
-    callbacks_.log_access = &log_access;
+        logger_(logging::LoggerFactory<ListenHTTP>::getLogger()),
+        batch_size_(0) {
+    callbacks_.log_message = &logMessage;
+    callbacks_.log_access = &logAccess;
   }
   // Destructor
   virtual ~ListenHTTP();
@@ -66,6 +68,8 @@ class ListenHTTP : public core::Processor {
   static core::Property SSLVerifyPeer;
   static core::Property SSLMinimumVersion;
   static core::Property HeadersAsAttributesRegex;
+  static core::Property BatchSize;
+  static core::Property BufferSize;
   // Supported Relationships
   static core::Relationship Success;
 
@@ -75,7 +79,7 @@ class ListenHTTP : public core::Processor {
   std::string getPort() const;
   bool isSecure() const;
 
-  struct response_body {
+  struct ResponseBody {
     std::string uri;
     std::string mime_type;
     std::string body;
@@ -86,49 +90,38 @@ class ListenHTTP : public core::Processor {
    public:
     Handler(std::string base_uri,
             core::ProcessContext *context,
-            core::ProcessSessionFactory *sessionFactory,
             std::string &&authDNPattern,
             std::string &&headersAsAttributesPattern);
-    bool handlePost(CivetServer *server, struct mg_connection *conn);
-    bool handleGet(CivetServer *server, struct mg_connection *conn);
-    bool handleHead(CivetServer *server, struct mg_connection *conn);
+    bool handlePost(CivetServer *server, struct mg_connection *conn) override;
+    bool handleGet(CivetServer *server, struct mg_connection *conn) override;
+    bool handleHead(CivetServer *server, struct mg_connection *conn) override;
 
     /**
      * Sets a static response body string to be used for a given URI, with a number of seconds it will be kept in memory.
      * @param response
      */
-    void set_response_body(struct response_body response) {
-      std::lock_guard<std::mutex> guard(uri_map_mutex_);
-
-      if (response.body.empty()) {
-        logger_->log_info("Unregistering response body for URI '%s'",
-                          response.uri);
-        response_uri_map_.erase(response.uri);
-      } else {
-        logger_->log_info("Registering response body for URI '%s' of length %lu",
-                          response.uri,
-                          response.body.size());
-        response_uri_map_[response.uri] = std::move(response);
-      }
-    }
+    void setResponseBody(const ResponseBody& response);
+
+    bool dequeueRequest(FlowFileBufferPair &flow_file_buffer_pair);
 
    private:
-    // Send HTTP 500 error response to client
-    void send_error_response(struct mg_connection *conn);
-    bool auth_request(mg_connection *conn, const mg_request_info *req_info) const;
-    void set_header_attributes(const mg_request_info *req_info, const std::shared_ptr<core::FlowFile> &flow_file) const;
-    void write_body(mg_connection *conn, const mg_request_info *req_info, bool include_payload = true);
+    void sendHttp500(struct mg_connection *conn);
+    void sendHttp503(struct mg_connection *conn);
+    bool authRequest(mg_connection *conn, const mg_request_info *req_info) const;
+    void setHeaderAttributes(const mg_request_info *req_info, const std::shared_ptr<core::FlowFile> &flow_file) const;
+    void writeBody(mg_connection *conn, const mg_request_info *req_info, bool include_payload = true);
+    std::unique_ptr<io::BufferStream> createContentBuffer(struct mg_connection *conn, const struct mg_request_info *req_info);
+    void enqueueRequest(mg_connection *conn, const mg_request_info *req_info, std::unique_ptr<io::BufferStream>);
 
     std::string base_uri_;
     std::regex auth_dn_regex_;
     std::regex headers_as_attrs_regex_;
     core::ProcessContext *process_context_;
-    core::ProcessSessionFactory *session_factory_;
-
-    // Logger
     std::shared_ptr<logging::Logger> logger_;
-    std::map<std::string, response_body> response_uri_map_;
+    std::map<std::string, ResponseBody> response_uri_map_;
     std::mutex uri_map_mutex_;
+    uint64_t buffer_size_;
+    utils::ConcurrentQueue<FlowFileBufferPair> request_buffer_;
   };
 
   class ResponseBodyReadCallback : public InputStreamCallback {
@@ -136,7 +129,7 @@ class ListenHTTP : public core::Processor {
     explicit ResponseBodyReadCallback(std::string *out_str)
         : out_str_(out_str) {
     }
-    int64_t process(std::shared_ptr<io::BaseStream> stream) {
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override {
       out_str_->resize(stream->size());
       uint64_t num_read = stream->read(reinterpret_cast<uint8_t *>(&(*out_str_)[0]),
                                            gsl::narrow<int>(stream->size()));
@@ -155,18 +148,14 @@ class ListenHTTP : public core::Processor {
   // Write callback for transferring data from HTTP request to content repo
   class WriteCallback : public OutputStreamCallback {
    public:
-    WriteCallback(struct mg_connection *conn, const struct mg_request_info *reqInfo);
-    int64_t process(std::shared_ptr<io::BaseStream> stream);
+    WriteCallback(std::unique_ptr<io::BufferStream>);
+    int64_t process(std::shared_ptr<io::BaseStream> stream) override;
 
    private:
-    // Logger
-    std::shared_ptr<logging::Logger> logger_;
-
-    struct mg_connection *conn_;
-    const struct mg_request_info *req_info_;
+    std::unique_ptr<io::BufferStream> request_content_;
   };
 
-  static int log_message(const struct mg_connection *conn, const char *message) {
+  static int logMessage(const struct mg_connection *conn, const char *message) {
     try {
       struct mg_context* ctx = mg_get_context(conn);
       /* CivetServer stores 'this' as the userdata when calling mg_start */
@@ -184,7 +173,7 @@ class ListenHTTP : public core::Processor {
     return 0;
   }
 
-  static int log_access(const struct mg_connection *conn, const char *message) {
+  static int logAccess(const struct mg_connection *conn, const char *message) {
     try {
       struct mg_context* ctx = mg_get_context(conn);
       /* CivetServer stores 'this' as the userdata when calling mg_start */
@@ -205,13 +194,17 @@ class ListenHTTP : public core::Processor {
   void notifyStop() override;
 
  private:
-  // Logger
-  std::shared_ptr<logging::Logger> logger_;
+  static const uint64_t DEFAULT_BUFFER_SIZE;
 
+  void processIncomingFlowFile(core::ProcessSession *session);
+  void processRequestBuffer(core::ProcessSession *session);
+
+  std::shared_ptr<logging::Logger> logger_;
   CivetCallbacks callbacks_;
   std::unique_ptr<CivetServer> server_;
   std::unique_ptr<Handler> handler_;
   std::string listeningPort;
+  uint64_t batch_size_;
 };
 
 REGISTER_RESOURCE(ListenHTTP, "Starts an HTTP Server and listens on a given base path to transform incoming requests into FlowFiles. The default URI of the Service will be "
diff --git a/extensions/civetweb/tests/ListenHTTPTests.cpp b/extensions/civetweb/tests/ListenHTTPTests.cpp
index df6a324..8a1ffa9 100644
--- a/extensions/civetweb/tests/ListenHTTPTests.cpp
+++ b/extensions/civetweb/tests/ListenHTTPTests.cpp
@@ -35,6 +35,14 @@
 
 class ListenHTTPTestsFixture {
  public:
+  struct HttpResponseExpectations {
+    HttpResponseExpectations(bool s, int64_t r) : should_succeed(s), response_code(r) {}
+    HttpResponseExpectations() : HttpResponseExpectations(true, 200) {}
+
+    bool should_succeed;
+    int64_t response_code;
+  };
+
   ListenHTTPTestsFixture() {
     LogTestController::getInstance().setDebug<TestPlan>();
     LogTestController::getInstance().setDebug<minifi::FlowController>();
@@ -89,6 +97,8 @@ class ListenHTTPTestsFixture {
 
     // Configure ListenHTTP processor
     plan->setProperty(listen_http, "Listening Port", "0");
+
+    plan->setProperty(log_attribute, "FlowFiles To Log", "0");
   }
 
   virtual ~ListenHTTPTestsFixture() {
@@ -110,6 +120,9 @@ class ListenHTTPTestsFixture {
   }
 
   void run_server() {
+    plan->setProperty(listen_http, "Batch Size", std::to_string(batch_size_));
+    plan->setProperty(listen_http, "Buffer Size", std::to_string(buffer_size_));
+
     plan->runNextProcessor();  // GetFile
     plan->runNextProcessor();  // UpdateAttribute
     plan->runNextProcessor();  // ListenHTTP
@@ -122,49 +135,79 @@ class ListenHTTPTestsFixture {
     url = protocol + "://localhost:" + portstr + "/contentListener/" + endpoint;
   }
 
-  void test_connect(bool should_succeed = true, int64_t response_code = 200) {
-    if (client == nullptr) {
-      client = std::unique_ptr<utils::HTTPClient>(new utils::HTTPClient());
-      client->initialize(method, url, ssl_context_service);
-      client->setVerbose();
-      for (const auto &header : headers) {
-        client->appendHeader(header.first, header.second);
-      }
-      if (method == "POST") {
-        client->setPostFields(payload);
-      }
+  void initialize_client() {
+    if (client != nullptr) {
+      return;
+    }
+
+    client = std::unique_ptr<utils::HTTPClient>(new utils::HTTPClient());
+    client->initialize(method, url, ssl_context_service);
+    client->setVerbose();
+    for (const auto &header : headers) {
+      client->appendHeader(header.first, header.second);
     }
-    auto res = client->submit();
-    if (should_succeed) {
-      REQUIRE(res);
-      REQUIRE(response_code == client->getResponseCode());
-      if (response_code == 200) {
-        if (endpoint == "test") {
-          std::string content_type;
-          if (!update_attribute->getDynamicProperty("mime.type", content_type)) {
-            content_type = "application/octet-stream";
-          }
-          REQUIRE(content_type == utils::StringUtils::trim(client->getParsedHeaders().at("Content-type")));
-          REQUIRE("19" == utils::StringUtils::trim(client->getParsedHeaders().at("Content-length")));
-        } else {
-          REQUIRE("0" == utils::StringUtils::trim(client->getParsedHeaders().at("Content-length")));
-        }
-        if (method == "GET" || method == "POST") {
-          const auto &body_chars = client->getResponseBody();
-          std::string response_body(body_chars.data(), body_chars.size());
-          if (endpoint == "test") {
-            REQUIRE("Hello response body" == response_body);
-          } else {
-            REQUIRE("" == response_body);
-          }
-
-          plan->runNextProcessor();  // LogAttribute
-          REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(payload.size()) + " Offset:0"));
-        }
+    if (method == "POST") {
+      client->setPostFields(payload);
+    }
+  }
+
+  void check_content_type() {
+    if (endpoint == "test") {
+      std::string content_type;
+      if (!update_attribute->getDynamicProperty("mime.type", content_type)) {
+        content_type = "application/octet-stream";
       }
+      REQUIRE(content_type == utils::StringUtils::trim(client->getParsedHeaders().at("Content-type")));
+      REQUIRE("19" == utils::StringUtils::trim(client->getParsedHeaders().at("Content-length")));
     } else {
-      REQUIRE(!res);
+      REQUIRE("0" == utils::StringUtils::trim(client->getParsedHeaders().at("Content-length")));
+    }
+  }
+
+  void check_response_body() {
+    if (method != "GET" && method != "POST") {
+      return;
+    }
+
+    const auto &body_chars = client->getResponseBody();
+    std::string response_body(body_chars.data(), body_chars.size());
+    if (endpoint == "test") {
+      REQUIRE("Hello response body" == response_body);
+    } else {
+      REQUIRE("" == response_body);
+    }
+  }
+
+  void check_response(const bool success, const HttpResponseExpectations& expect) {
+    if (!expect.should_succeed) {
+      REQUIRE(!success);
+      REQUIRE(expect.response_code == client->getResponseCode());
+      return;
+    }
+
+    REQUIRE(success);
+    REQUIRE(expect.response_code == client->getResponseCode());
+    if (expect.response_code != 200) {
+      return;
+    }
+
+    check_content_type();
+    check_response_body();
+  }
+
+  void test_connect(const std::vector<HttpResponseExpectations>& response_expectaitons = {HttpResponseExpectations{}}, std::size_t expected_commited_requests = 1) {
+    initialize_client();
+
+    for (const auto& expect : response_expectaitons) {
+      check_response(client->submit(), expect);
+    }
+
+    plan->runCurrentProcessor();  // ListenHTTP
+    plan->runNextProcessor();  // LogAttribute
+    if (expected_commited_requests > 0 && (method == "GET" || method == "POST")) {
+      REQUIRE(LogTestController::getInstance().contains("Size:" + std::to_string(payload.size()) + " Offset:0"));
     }
+    REQUIRE(LogTestController::getInstance().contains("Logged " + std::to_string(expected_commited_requests) + " flow files"));
   }
 
  protected:
@@ -183,6 +226,8 @@ class ListenHTTPTestsFixture {
   std::string endpoint = "test";
   std::string url;
   std::unique_ptr<utils::HTTPClient> client;
+  std::size_t batch_size_ = 0;
+  std::size_t buffer_size_ = 0;
 };
 
 TEST_CASE("ListenHTTP creation", "[basic]") {
@@ -211,7 +256,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP PUT", "[basic]") {
 
   method = "PUT";
 
-  test_connect(true /*should_succeed*/, 405);
+  test_connect({HttpResponseExpectations{true, 405}}, 0);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP DELETE", "[basic]") {
@@ -219,7 +264,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP DELETE", "[basic]") {
 
   method = "DELETE";
 
-  test_connect(true /*should_succeed*/, 405);
+  test_connect({HttpResponseExpectations{true, 405}}, 0);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP HEAD", "[basic]") {
@@ -227,12 +272,13 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP HEAD", "[basic]") {
 
   method = "HEAD";
 
-  test_connect();
+  test_connect({HttpResponseExpectations{}}, 0);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP no body", "[basic]") {
   endpoint = "test2";
 
+
   SECTION("GET") {
     method = "GET";
   }
@@ -245,7 +291,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP no body", "[basic]") {
   }
 
   run_server();
-  test_connect();
+  const std::size_t expected_commited_requests = (method == "POST" || method == "GET") ? 1 : 0;
+  test_connect({HttpResponseExpectations{}}, expected_commited_requests);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP body with different mime type", "[basic][mime]") {
@@ -263,7 +310,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP body with different mime type", "
   }
 
   run_server();
-  test_connect();
+  const std::size_t expected_commited_requests = (method == "POST" || method == "GET") ? 1 : 0;
+  test_connect({HttpResponseExpectations{}}, expected_commited_requests);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP all headers", "[basic][headers]") {
@@ -308,6 +356,65 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP filtered headers", "[headers]") {
   REQUIRE(false == LogTestController::getInstance().contains("key:bar value:2", std::chrono::seconds(0) /*timeout*/));
 }
 
+TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTP Batch tests", "[batch]") {
+  std::size_t expected_processed_request_count = 0;
+  std::vector<HttpResponseExpectations> requests;
+  auto create_requests = [&](std::size_t successful, std::size_t failed) {
+    for (auto i = 0; i < successful; ++i) {
+      requests.push_back(HttpResponseExpectations{true,200});
+    }
+    for (auto i = 0; i < failed; ++i) {
+      requests.push_back(HttpResponseExpectations{true,503});
+    }
+  };
+
+  SECTION("Batch size same as request count") {
+    batch_size_ = 5;
+    create_requests(5, 0);
+    expected_processed_request_count = 5;
+
+    SECTION("GET") {
+      method = "GET";
+    }
+    SECTION("POST") {
+      method = "POST";
+      payload = "Test payload";
+    }
+  }
+
+  SECTION("Batch size smaller than request count") {
+    batch_size_ = 4;
+    create_requests(5, 0);
+    expected_processed_request_count = 4;
+
+    SECTION("GET") {
+      method = "GET";
+    }
+    SECTION("POST") {
+      method = "POST";
+      payload = "Test payload";
+    }
+  }
+
+  SECTION("Buffer size smaller than request count") {
+    batch_size_ = 5;
+    buffer_size_ = 3;
+    create_requests(3, 2);
+    expected_processed_request_count = 3;
+
+    SECTION("GET") {
+      method = "GET";
+    }
+    SECTION("POST") {
+      method = "POST";
+      payload = "Test payload";
+    }
+  }
+
+  run_server();
+  test_connect(requests, expected_processed_request_count);
+}
+
 #ifdef OPENSSL_SUPPORT
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS without CA", "[basic][https]") {
   plan->setProperty(listen_http, "SSL Certificate", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/server.pem"));
@@ -326,7 +433,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS without CA", "[basic][https]") {
   }
 
   run_server();
-  test_connect();
+  const std::size_t expected_commited_requests = (method == "POST" || method == "GET") ? 1 : 0;
+  test_connect({HttpResponseExpectations{}}, expected_commited_requests);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS without client cert", "[basic][https]") {
@@ -347,7 +455,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS without client cert", "[basic][h
   }
 
   run_server();
-  test_connect();
+  const std::size_t expected_commited_requests = (method == "POST" || method == "GET") ? 1 : 0;
+  test_connect({HttpResponseExpectations{}}, expected_commited_requests);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert from good CA", "[https]") {
@@ -369,7 +478,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert from good CA",
   }
 
   run_server();
-  test_connect();
+  const std::size_t expected_commited_requests = (method == "POST" || method == "GET") ? 1 : 0;
+  test_connect({HttpResponseExpectations{}}, expected_commited_requests);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with PKCS12 client cert from good CA", "[https]") {
@@ -391,7 +501,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with PKCS12 client cert from goo
   }
 
   run_server();
-  test_connect();
+  const std::size_t expected_commited_requests = (method == "POST" || method == "GET") ? 1 : 0;
+  test_connect({HttpResponseExpectations{}}, expected_commited_requests);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert from bad CA", "[https]") {
@@ -399,6 +510,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert from bad CA", "
   plan->setProperty(listen_http, "SSL Certificate Authority", utils::file::FileUtils::concat_path(utils::file::FileUtils::get_executable_dir(), "resources/goodCA.crt"));
 
   bool should_succeed = false;
+  int64_t response_code = 0;
+  std::size_t expected_commited_requests = 0;
   SECTION("verify peer") {
     should_succeed = false;
     plan->setProperty(listen_http, "SSL Verify Peer", "yes");
@@ -416,13 +529,16 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert from bad CA", "
   }
   SECTION("do not verify peer") {
     should_succeed = true;
+    response_code = 200;
 
     SECTION("GET") {
       method = "GET";
+      expected_commited_requests = 1;
     }
     SECTION("POST") {
       method = "POST";
       payload = "Test payload";
+      expected_commited_requests = 1;
     }
     SECTION("HEAD") {
       method = "HEAD";
@@ -432,7 +548,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert from bad CA", "
   create_ssl_context_service("goodCA.crt", "badCA_goodClient.pem");
 
   run_server();
-  test_connect(should_succeed);
+  test_connect({HttpResponseExpectations{should_succeed, response_code}}, expected_commited_requests);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert with matching DN", "[https][DN]") {
@@ -455,7 +571,8 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert with matching D
   }
 
   run_server();
-  test_connect();
+  const std::size_t expected_commited_requests = (method == "POST" || method == "GET") ? 1 : 0;
+  test_connect({HttpResponseExpectations{}}, expected_commited_requests);
 }
 
 TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert with non-matching DN", "[https][DN]") {
@@ -464,6 +581,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert with non-matchi
   plan->setProperty(listen_http, "Authorized DN Pattern", ".*/CN=good\\..*");
 
   int64_t response_code = 0;
+  std::size_t expected_commited_requests = 0;
   SECTION("verify peer") {
     plan->setProperty(listen_http, "SSL Verify Peer", "yes");
     response_code = 403;
@@ -484,10 +602,12 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert with non-matchi
 
     SECTION("GET") {
       method = "GET";
+      expected_commited_requests = 1;
     }
     SECTION("POST") {
       method = "POST";
       payload = "Test payload";
+      expected_commited_requests = 1;
     }
     SECTION("HEAD") {
       method = "HEAD";
@@ -497,7 +617,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS with client cert with non-matchi
   create_ssl_context_service("goodCA.crt", "goodCA_badClient.pem");
 
   run_server();
-  test_connect(true /*should_succeed*/, response_code /*response_code*/);
+  test_connect({HttpResponseExpectations{true, response_code}}, expected_commited_requests);
 }
 
 #if CURL_AT_LEAST_VERSION(7, 54, 0)
@@ -528,7 +648,7 @@ TEST_CASE_METHOD(ListenHTTPTestsFixture, "HTTPS minimum SSL version", "[https]")
   }
   REQUIRE(client->setSpecificSSLVersion(utils::SSLVersion::TLSv1_1));
 
-  test_connect(false /*should_succeed*/);
+  test_connect({HttpResponseExpectations{false, 0}}, 0);
 }
 #endif
 #endif
diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
index 2ca3c2b..512d7c8 100644
--- a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
@@ -33,7 +33,7 @@
 
 class HttpTestHarness : public HTTPIntegrationBase {
 public:
-  HttpTestHarness() {
+  HttpTestHarness() : HTTPIntegrationBase(4000) {
     char format[] = "/tmp/ssth.XXXXXX";
     dir = testController.createTempDirectory(format);
   }
diff --git a/libminifi/src/io/FileStream.cpp b/libminifi/src/io/FileStream.cpp
index 7df6bb3..700ec9f 100644
--- a/libminifi/src/io/FileStream.cpp
+++ b/libminifi/src/io/FileStream.cpp
@@ -122,7 +122,12 @@ int FileStream::read(uint8_t *buf, int buflen) {
       file_stream_->clear();
       file_stream_->seekg(0, file_stream_->end);
       file_stream_->seekp(0, file_stream_->end);
-      size_t len = gsl::narrow<size_t>(file_stream_->tellg());
+      auto tellg_result = file_stream_->tellg();
+      if (tellg_result < 0) {
+        logging::LOG_ERROR(logger_) << "Tellg call on file stream failed.";
+        return -1;
+      }
+      size_t len = gsl::narrow<size_t>(tellg_result);
       size_t ret = len - offset_;
       offset_ = len;
       length_ = len;