You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by fg...@apache.org on 2022/01/17 11:22:54 UTC

[nifi-minifi-cpp] 04/04: MINIFICPP-1691: PutSplunkHTTP and QuerySplunkIndexingStatus

This is an automated email from the ASF dual-hosted git repository.

fgerlits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit de987db11ab46424fac0b073bd43407f6a5ad4ac
Author: Martin Zink <ma...@protonmail.com>
AuthorDate: Fri Oct 29 12:51:35 2021 +0200

    MINIFICPP-1691: PutSplunkHTTP and QuerySplunkIndexingStatus
    
    Co-authored-by: Márton Szász <sz...@gmail.com>
    Signed-off-by: Ferenc Gerlits <fg...@gmail.com>
    
    This closes #1219
---
 .github/workflows/ci.yml                           |   4 +-
 CMakeLists.txt                                     |   6 +
 PROCESSORS.md                                      | 105 ++++++++++
 README.md                                          |   1 +
 bootstrap.sh                                       |   2 +
 bstrp_functions.sh                                 |   6 +-
 cmake/DockerConfig.cmake                           |   1 +
 docker/Dockerfile                                  |   3 +-
 docker/requirements.txt                            |   1 +
 .../integration/MiNiFi_integration_test_driver.py  |  12 ++
 docker/test/integration/features/splunk.feature    |  50 +++++
 .../integration/minifi/core/DockerTestCluster.py   |  71 +++++++
 docker/test/integration/minifi/core/ImageStore.py  |   5 +
 .../test/integration/minifi/core/SSL_cert_utils.py |  79 +++++++
 .../minifi/core/SingleNodeDockerCluster.py         |   3 +
 .../integration/minifi/core/SplunkContainer.py     |  28 +++
 .../integration/minifi/processors/PutSplunkHTTP.py |  14 ++
 .../minifi/processors/QuerySplunkIndexingStatus.py |  15 ++
 .../integration/resources/splunk-hec/Dockerfile    |   2 +
 .../resources/splunk-hec/conf/default.yml          |   6 +
 docker/test/integration/steps/steps.py             |  50 ++++-
 extensions/http-curl/client/HTTPClient.cpp         |   5 +-
 extensions/http-curl/processors/InvokeHTTP.cpp     |   7 -
 .../http-curl/tests/unit/HTTPClientTests.cpp       |   6 +
 extensions/splunk/CMakeLists.txt                   |  33 +++
 extensions/splunk/PutSplunkHTTP.cpp                | 176 ++++++++++++++++
 extensions/splunk/PutSplunkHTTP.h                  |  58 +++++
 extensions/splunk/QuerySplunkIndexingStatus.cpp    | 202 ++++++++++++++++++
 extensions/splunk/QuerySplunkIndexingStatus.h      |  61 ++++++
 extensions/splunk/SplunkAttributes.h               |  27 +++
 extensions/splunk/SplunkHECProcessor.cpp           |  81 +++++++
 extensions/splunk/SplunkHECProcessor.h             |  61 ++++++
 extensions/splunk/tests/CMakeLists.txt             |  41 ++++
 extensions/splunk/tests/MockSplunkHEC.h            | 233 +++++++++++++++++++++
 extensions/splunk/tests/PutSplunkHTTPTests.cpp     | 168 +++++++++++++++
 .../tests/QuerySplunkIndexingStatusTests.cpp       | 151 +++++++++++++
 .../tests/unit/ProcessorTests.cpp                  |   2 +-
 libminifi/test/ReadFromFlowFileTestProcessor.cpp   |  28 ++-
 libminifi/test/ReadFromFlowFileTestProcessor.h     |  16 +-
 libminifi/test/TestBase.cpp                        |   4 +
 libminifi/test/TestBase.h                          |   1 +
 win_build_vs.bat                                   |   3 +-
 42 files changed, 1805 insertions(+), 23 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 7cb2b51..b94c80c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -81,7 +81,7 @@ jobs:
         run: |
           PATH %PATH%;C:\Program Files (x86)\Windows Kits\10\bin\10.0.19041.0\x64
           PATH %PATH%;C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\MSBuild\Current\Bin\Roslyn
-          win_build_vs.bat ..\b /64 /CI /S /A /PDH /K /L /R /Z /N /RO
+          win_build_vs.bat ..\b /64 /CI /S /A /PDH /SPLUNK /K /L /R /Z /N /RO
         shell: cmd
       - name: test
         run: cd ..\b && ctest --timeout 300 --parallel 8 -C Release --output-on-failure
@@ -202,7 +202,7 @@ jobs:
           if [ -d ~/.ccache ]; then mv ~/.ccache .; fi
           mkdir build
           cd build
-          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_OPC=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
+          cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF -DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_OPC=ON -DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
           make docker
       - id: install_deps
         run: |
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7c3f134..23cc2df 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -115,6 +115,7 @@ option(ENABLE_SFTP "Enables SFTP support." OFF)
 option(ENABLE_OPENWSMAN "Enables the Openwsman extensions." OFF)
 option(ENABLE_AZURE "Enables Azure support." OFF)
 option(ENABLE_ENCRYPT_CONFIG "Enables build of encrypt-config binary." ON)
+option(ENABLE_SPLUNK "Enable Splunk support" OFF)
 option(DOCKER_BUILD_ONLY "Disables all targets except docker build scripts. Ideal for systems without an up-to-date compiler." OFF)
 
 ## Keep all option definitions above this line
@@ -598,6 +599,11 @@ if (ENABLE_SYSTEMD)
 	createExtension(SYSTEMD-EXTENSIONS "SYSTEMD EXTENSIONS" "Enabled log collection from journald" "extensions/systemd" "extensions/systemd/tests")
 endif()
 
+## Add the splunk extension
+if (ENABLE_ALL OR ENABLE_SPLUNK)
+	createExtension(SPLUNK-EXTENSIONS "SPLUNK EXTENSIONS" "This enables Splunk support" "extensions/splunk" "extensions/splunk/tests")
+endif()
+
 ## NOW WE CAN ADD LIBRARIES AND EXTENSIONS TO MAIN
 add_subdirectory(main)
 
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 6a2dc8e..3446c98 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -50,9 +50,11 @@
 - [PutOPCProcessor](#putopcprocessor)
 - [PutS3Object](#puts3object)
 - [PutSFTP](#putsftp)
+- [PutSplunkHTTP](#putsplunkhttp)
 - [PutSQL](#putsql)
 - [PutUDP](#putudp)
 - [QueryDatabaseTable](#querydatabasetable)
+- [QuerySplunkIndexingStatus](#querysplunkindexingstatus)
 - [ReplaceText](#replacetext)
 - [RetryFlowFile](#retryflowfile)
 - [RouteOnAttribute](#routeonattribute)
@@ -1503,6 +1505,54 @@ In the list below, the names of required properties appear in bold. Any other pr
 |success|FlowFiles that are successfully sent will be routed to success|
 
 
+## PutSplunkHTTP
+
+### Description
+Sends the flow file contents to the specified [Splunk HTTP Event Collector](https://docs.splunk.com/Documentation/SplunkCloud/latest/Data/UsetheHTTPEventCollector) over HTTP or HTTPS.
+
+#### Event parameters
+The "Source", "Source Type", "Host" and "Index" properties are optional and will be set by Splunk if unspecified. If set,
+the default values will be overwritten with the user specified ones. For more details about the Splunk API, please visit
+[this documentation](https://docs.splunk.com/Documentation/Splunk/LATEST/RESTREF/RESTinput#services.2Fcollector.2Fraw)
+
+#### Acknowledgment
+HTTP Event Collector (HEC) in Splunk provides the possibility of index acknowledgement, which can be used to monitor
+the indexing status of the individual events. PutSplunkHTTP supports this feature by enriching the outgoing flow file
+with the necessary information, making it possible for a later processor to poll the status based on. The necessary
+information for this is stored within flow file attributes "splunk.acknowledgement.id" and "splunk.responded.at".
+
+#### Error information
+For more refined processing, flow files are enriched with additional information if possible. The information is stored
+in the flow file attribute "splunk.status.code" or "splunk.response.code", depending on the success of the processing.
+The attribute "splunk.status.code" is always filled when the Splunk API call is executed and contains the HTTP status code
+of the response. In case the flow file transferred into "failure" relationship, the "splunk.response.code" might be
+also filled, based on the Splunk response code.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                        | Default Value | Allowable Values                   | Description                                                                                                                                                                                                                                                                                                                                                                                   |
+| --------------------------  | ------------- | ---------------------------------  | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| **Hostname**                |               |                                    | The ip address or hostname of the Splunk server.                                                                                                                                                                                                                                                                                                                                              |
+| **Port**                    | 8088          |                                    | The HTTP Event Collector HTTP Port Number.                                                                                                                                                                                                                                                                                                                                                    |
+| **Token**                   |               | Splunk &lt;guid&gt;                | HTTP Event Collector token starting with the string Splunk. For example `Splunk 1234578-abcd-1234-abcd-1234abcd`                                                                                                                                                                                                                                                                              |
+| **Splunk Request Channel**  |               | &lt;guid&gt;                       | Identifier of the used request channel.                                                                                                                                                                                                                                                                                                                                                       |
+| SSL Context Service         |               |                                    | The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.                                                                                                                                                                                                                                                                                       |
+| Source                      |               |                                    | Basic field describing the source of the event. If unspecified, the event will use the default defined in splunk.                                                                                                                                                                                                                                                                             |
+| SourceType                  |               |                                    | Basic field describing the source type of the event. If unspecified, the event will use the default defined in splunk.                                                                                                                                                                                                                                                                        |
+| Host                        |               |                                    | Basic field describing the host of the event. If unspecified, the event will use the default defined in splunk.                                                                                                                                                                                                                                                                               |
+| Index                       |               |                                    | Identifies the index where to send the event. If unspecified, the event will use the default defined in splunk.                                                                                                                                                                                                                                                                               |
+| Content Type                |               |                                    | The media type of the event sent to Splunk. If not set, "mime.type" flow file attribute will be used. In case of neither of them is specified, this information will not be sent to the server.                                                                                                                                                                                               |
+
+### Relationships
+
+| Name     | Description                                                                            |
+| -------- | -------------------------------------------------------------------------------------- |
+| success  | FlowFiles that are sent successfully to the destination are sent to this relationship. |
+| failure  | FlowFiles that failed to be sent to the destination are sent to this relationship.     |
+
+
 ## PutSQL
 
 ### Description
@@ -1522,6 +1572,7 @@ In the list below, the names of required properties appear in bold. Any other pr
 | - | - |
 |success|After a successful SQL update operation, the incoming FlowFile sent here|
 
+
 ## PutUDP
 
 ### Description
@@ -1574,6 +1625,60 @@ In the list below, the names of required properties appear in bold. Any other pr
 |initial.maxvalue.<max_value_column>|Initial maximum value for the specified column|Specifies an initial max value for max value column(s). Properties should be added in the format `initial.maxvalue.<max_value_column>`. This value is only used the first time the table is accessed (when a Maximum Value Column is specified).<br/>**Supports Expression Language: true**|
 
 
+## QuerySplunkIndexingStatus
+
+### Description
+
+This processor is responsible for polling Splunk server and determine if a Splunk event is acknowledged at the time of
+execution. For more details about the HEC Index Acknowledgement please see
+[this documentation.](https://docs.splunk.com/Documentation/Splunk/LATEST/Data/AboutHECIDXAck)
+
+#### Input requirements
+In order to work properly, the incoming flow files need to have the attributes "splunk.acknowledgement.id" and
+"splunk.responded.at" filled properly. The flow file attribute "splunk.acknowledgement.id" should contain the "ackId"
+which can be extracted from the response to the original Splunk put call. The flow file attribute "splunk.responded.at"
+should contain the timestamp describing when the put call was answered by Splunk.
+These required attributes are set by PutSplunkHTTP processor.
+
+#### Undetermined relationship
+Undetermined cases are normal in healthy environment as it is possible that minifi asks for indexing status before Splunk
+finishes and acknowledges it. These cases are safe to retry, and it is suggested to loop "undetermined" relationship
+back to the processor for later try. Flow files transferred into the "Undetermined" relationship are penalized.
+
+#### Performance
+Please keep Splunk channel limitations in mind: there are multiple configuration parameters in Splunk which might have direct
+effect on the performance and behaviour of the QuerySplunkIndexingStatus processor. For example "max_number_of_acked_requests_pending_query"
+and "max_number_of_acked_requests_pending_query_per_ack_channel" might limit the amount of ackIDs Splunk stores.
+
+Also, it is suggested to execute the query in batches. The "Maximum Query Size" property might be used for fine tune
+the maximum number of events the processor will query about in one API request. This serves as an upper limit for the
+batch but the processor might execute the query with smaller number of undetermined events.
+
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                        | Default Value | Allowable Values                   | Description                                                                                                                                                                                                                                                                                                                                                                                   |
+| --------------------------  | ------------- | ---------------------------------  | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| **Maximum Waiting Time**    | 1 hour        | &lt;duration&gt; &lt;time unit&gt; | The maximum time the processor tries to acquire acknowledgement confirmation for an index, from the point of registration.<br> After the given amount of time, the processor considers the index as not acknowledged and transfers the FlowFile to the _unacknowledged_ relationship.                                                                                                         |
+| **Maximum Query Size**      | 1000          | integers                           | The maximum number of acknowledgement identifiers the outgoing query contains in one batch.  It is recommended not to set it too low in order to reduce network communication.                                                                                                                                                                                                                |
+| **Hostname**                |               |                                    | The ip address or hostname of the Splunk server.                                                                                                                                                                                                                                                                                                                                              |
+| **Port**                    | 8088          |                                    | The HTTP Event Collector HTTP Port Number.                                                                                                                                                                                                                                                                                                                                                    |
+| **Token**                   |               | Splunk &lt;guid&gt;                | HTTP Event Collector token starting with the string Splunk. For example `Splunk 1234578-abcd-1234-abcd-1234abcd`                                                                                                                                                                                                                                                                              |
+| **Splunk Request Channel**  |               | &lt;guid&gt;                       | Identifier of the used request channel.                                                                                                                                                                                                                                                                                                                                                       |
+| SSL Context Service         |               |                                    | The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.                                                                                                                                                                                                                                                                                       |
+
+### Relationships
+
+| Name           | Description                                                                                                                                                                                                                                                                                                                                                          |
+| -------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| acknowledged   | A FlowFile is transferred to this relationship when the acknowledgement was successful.                                                                                                                                                                                                                                                                              |
+| unacknowledged | A FlowFile is transferred to this relationship when the acknowledgement was not successful.<br>This can happen when the acknowledgement did not happened within the time period set for Maximum Waiting Time.<br>FlowFiles with acknowledgement id unknown for the Splunk server will be transferred to this relationship after the Maximum Waiting Time is reached. |
+| undetermined   | A FlowFile is transferred to this relationship when the acknowledgement state is not determined.<br> This can happens when Splunk returns with HTTP 200 but with false response for the acknowledgement id in the flow file attribute.<br>FlowFiles transferred to this relationship might be penalized.<br>                                                                  |
+| failure        | A FlowFile is transferred to this relationship when the acknowledgement was not successful due to errors during the communication, or if the flowfile was missing the acknowledgement id                                                                                                                                                                             |
+
+
 ## ReplaceText
 
 ### Description
diff --git a/README.md b/README.md
index 7dac13e..5ba3e85 100644
--- a/README.md
+++ b/README.md
@@ -93,6 +93,7 @@ Through JNI extensions you can run NiFi processors using NARs. The JNI extension
 | Sensors | GetEnvironmentalSensors<br/>GetMovementSensors | -DENABLE_SENSORS=ON |
 | SFTP | [FetchSFTP](PROCESSORS.md#fetchsftp)<br/>[ListSFTP](PROCESSORS.md#listsftp)<br/>[PutSFTP](PROCESSORS.md#putsftp) | -DENABLE_SFTP=ON |
 | SQL | [ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)<br/>[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)<br/> | -DENABLE_SQL=ON  |
+| Splunk | [PutSplunkHTTP](PROCESSORS.md#putsplunkhttp)<br/>[QuerySplunkIndexingStatus](PROCESSORS.md#querysplunkindexingstatus)| -DENABLE_SPLUNK=ON  |
 | Systemd | [ConsumeJournald](PROCESSORS.md#consumejournald) | -DENABLE_SYSTEMD=ON |
 | Tensorflow | TFApplyGraph<br/>TFConvertImageToTensor<br/>TFExtractTopLabels<br/>      |    -DENABLE_TENSORFLOW=ON  |
 | USB Camera | [GetUSBCamera](PROCESSORS.md#getusbcamera)     |    -DENABLE_USB_CAMERA=ON  |
diff --git a/bootstrap.sh b/bootstrap.sh
index 503db50..00f6c44 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -332,6 +332,8 @@ add_disabled_option SYSTEMD_ENABLED ${TRUE} "ENABLE_SYSTEMD"
 add_disabled_option NANOFI_ENABLED ${FALSE} "ENABLE_NANOFI"
 set_dependency PYTHON_ENABLED NANOFI_ENABLED
 
+add_disabled_option SPLUNK_ENABLED ${FALSE} "ENABLE_SPLUNK"
+
 USE_SHARED_LIBS=${TRUE}
 ASAN_ENABLED=${FALSE}
 FAIL_ON_WARNINGS=${FALSE}
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index 66d08bb..5f5f3e7 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -391,6 +391,7 @@ show_supported_features() {
   echo "X. Azure Support ...............$(print_feature_status AZURE_ENABLED)"
   echo "Y. Systemd Support .............$(print_feature_status SYSTEMD_ENABLED)"
   echo "Z. NanoFi Support ..............$(print_feature_status NANOFI_ENABLED)"
+  echo "AA. Splunk Support .............$(print_feature_status SPLUNK_ENABLED)"
   echo "****************************************"
   echo "            Build Options."
   echo "****************************************"
@@ -413,7 +414,7 @@ show_supported_features() {
 
 read_feature_options(){
   local choice
-  echo -n "Enter choice [ A - Z or 1-7 ] "
+  echo -n "Enter choice [ A - Z or AA or 1-7] "
   read -r choice
   choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]')
   case $choice in
@@ -445,6 +446,7 @@ read_feature_options(){
     x) ToggleFeature AZURE_ENABLED ;;
     y) ToggleFeature SYSTEMD_ENABLED ;;
     z) ToggleFeature NANOFI_ENABLED ;;
+    aa) ToggleFeature SPLUNK_ENABLED ;;
     1) ToggleFeature TESTS_ENABLED ;;
     2) EnableAllFeatures ;;
     3) ToggleFeature JNI_ENABLED;;
@@ -463,7 +465,7 @@ read_feature_options(){
       fi
       ;;
     q) exit 0;;
-    *) echo -e "${RED}Please enter an option A-Z or 1-7...${NO_COLOR}" && sleep 2
+    *) echo -e "${RED}Please enter an option A-Z or AA or 1-7...${NO_COLOR}" && sleep 2
   esac
 }
 
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index 6520200..4c10ccf 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -44,6 +44,7 @@ add_custom_target(
         -c ENABLE_AZURE=${ENABLE_AZURE}
         -c ENABLE_ENCRYPT_CONFIG=${ENABLE_ENCRYPT_CONFIG}
         -c ENABLE_NANOFI=${ENABLE_NANOFI}
+        -c ENABLE_SPLUNK=${ENABLE_SPLUNK}
         -c DISABLE_CURL=${DISABLE_CURL}
         -c DISABLE_JEMALLOC=${DISABLE_JEMALLOC}
         -c DISABLE_CIVET=${DISABLE_CIVET}
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 2e322c9..cca67f4 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -50,6 +50,7 @@ ARG ENABLE_OPENWSMAN=OFF
 ARG ENABLE_AZURE=OFF
 ARG ENABLE_ENCRYPT_CONFIG=ON
 ARG ENABLE_NANOFI=OFF
+ARG ENABLE_SPLUNK=OFF
 ARG DISABLE_CURL=OFF
 ARG DISABLE_JEMALLOC=ON
 ARG DISABLE_CIVET=OFF
@@ -114,7 +115,7 @@ RUN cmake -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_ALL="${ENABLE_ALL}" -DENABL
     -DENABLE_LIBRDKAFKA="${ENABLE_LIBRDKAFKA}" -DENABLE_SENSORS="${ENABLE_SENSORS}" -DENABLE_USB_CAMERA="${ENABLE_USB_CAMERA}" \
     -DENABLE_TENSORFLOW="${ENABLE_TENSORFLOW}" -DENABLE_AWS="${ENABLE_AWS}" -DENABLE_BUSTACHE="${ENABLE_BUSTACHE}" -DENABLE_SFTP="${ENABLE_SFTP}" \
     -DENABLE_OPENWSMAN="${ENABLE_OPENWSMAN}" -DENABLE_AZURE="${ENABLE_AZURE}" -DENABLE_NANOFI=${ENABLE_NANOFI} -DENABLE_SYSTEMD=OFF \
-    -DDISABLE_CURL="${DISABLE_CURL}" -DDISABLE_JEMALLOC="${DISABLE_JEMALLOC}" -DDISABLE_CIVET="${DISABLE_CIVET}" \
+    -DDISABLE_CURL="${DISABLE_CURL}" -DDISABLE_JEMALLOC="${DISABLE_JEMALLOC}" -DDISABLE_CIVET="${DISABLE_CIVET}" -DENABLE_SPLUNK=${ENABLE_SPLUNK} \
     -DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}" -DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" \
diff --git a/docker/requirements.txt b/docker/requirements.txt
index f6a1785..b200eb4 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -6,3 +6,4 @@ confluent-kafka==1.7.0
 PyYAML==5.4.1
 m2crypto==0.37.1
 watchdog==2.1.2
+pyopenssl==21.0.0
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index 34bc202..390b22e 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -60,6 +60,12 @@ class MiNiFi_integration_test():
         self.cluster.deploy('kafka-broker')
         assert self.wait_for_container_startup_to_finish('kafka-broker')
 
+    def start_splunk(self):
+        self.cluster.acquire_container('splunk', 'splunk')
+        self.cluster.deploy('splunk')
+        assert self.wait_for_container_startup_to_finish('splunk')
+        assert self.cluster.enable_splunk_hec_indexer('splunk', 'splunk_hec_token')
+
     def start(self):
         logging.info("MiNiFi_integration_test start")
         self.cluster.deploy_flow()
@@ -191,6 +197,12 @@ class MiNiFi_integration_test():
     def wait_for_kafka_consumer_to_be_registered(self, kafka_container_name):
         assert self.cluster.wait_for_kafka_consumer_to_be_registered(kafka_container_name)
 
+    def check_splunk_event(self, splunk_container_name, query):
+        assert self.cluster.check_splunk_event(splunk_container_name, query)
+
+    def check_splunk_event_with_attributes(self, splunk_container_name, query, attributes):
+        assert self.cluster.check_splunk_event_with_attributes(splunk_container_name, query, attributes)
+
     def check_minifi_log_contents(self, line, timeout_seconds=60):
         self.check_container_log_contents("minifi-cpp", line, timeout_seconds)
 
diff --git a/docker/test/integration/features/splunk.feature b/docker/test/integration/features/splunk.feature
new file mode 100644
index 0000000..87aac0a
--- /dev/null
+++ b/docker/test/integration/features/splunk.feature
@@ -0,0 +1,50 @@
+Feature: Sending data to Splunk HEC using PutSplunkHTTP
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers data to a Splunk HEC
+    Given a Splunk HEC is set up and running
+    And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "foobar" is present in "/tmp/input"
+    And a PutSplunkHTTP processor set up to communicate with the Splunk HEC instance
+    And a QuerySplunkIndexingStatus processor set up to communicate with the Splunk HEC Instance
+    And the "Splunk Request Channel" properties of the PutSplunkHTTP and QuerySplunkIndexingStatus processors are set to the same random guid
+    And the "Source" property of the PutSplunkHTTP processor is set to "my-source"
+    And the "Source Type" property of the PutSplunkHTTP processor is set to "my-source-type"
+    And the "Host" property of the PutSplunkHTTP processor is set to "my-host"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PutSplunkHTTP
+    And the "success" relationship of the PutSplunkHTTP processor is connected to the QuerySplunkIndexingStatus
+    And the "undetermined" relationship of the QuerySplunkIndexingStatus processor is connected to the QuerySplunkIndexingStatus
+    And the "acknowledged" relationship of the QuerySplunkIndexingStatus processor is connected to the PutFile
+    And the "Hostname" property of the PutSplunkHTTP processor is set to "http://splunk"
+    And the "Hostname" property of the QuerySplunkIndexingStatus processor is set to "http://splunk"
+
+    When both instances start up
+    Then a flowfile with the content "foobar" is placed in the monitored directory in less than 20 seconds
+    And an event is registered in Splunk HEC with the content "foobar" with "my-source" set as source and "my-source-type" set as sourcetype and "my-host" set as host
+
+
+  Scenario: A MiNiFi instance transfers data to a Splunk HEC with SSL enabled
+    Given a Splunk HEC is set up and running
+    And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "foobar" is present in "/tmp/input"
+    And a PutSplunkHTTP processor set up to communicate with the Splunk HEC instance
+    And a QuerySplunkIndexingStatus processor set up to communicate with the Splunk HEC Instance
+    And the "Splunk Request Channel" properties of the PutSplunkHTTP and QuerySplunkIndexingStatus processors are set to the same random guid
+    And the "Source" property of the PutSplunkHTTP processor is set to "my-source"
+    And the "Source Type" property of the PutSplunkHTTP processor is set to "my-source-type"
+    And the "Host" property of the PutSplunkHTTP processor is set to "my-host"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PutSplunkHTTP
+    And the "success" relationship of the PutSplunkHTTP processor is connected to the QuerySplunkIndexingStatus
+    And the "undetermined" relationship of the QuerySplunkIndexingStatus processor is connected to the QuerySplunkIndexingStatus
+    And the "acknowledged" relationship of the QuerySplunkIndexingStatus processor is connected to the PutFile
+    And SSL is enabled for the Splunk HEC and the SSL context service is set up for PutSplunkHTTP and QuerySplunkIndexingStatus
+    And the "Hostname" property of the PutSplunkHTTP processor is set to "https://splunk"
+    And the "Hostname" property of the QuerySplunkIndexingStatus processor is set to "https://splunk"
+
+    When both instances start up
+    Then a flowfile with the content "foobar" is placed in the monitored directory in less than 20 seconds
+    And an event is registered in Splunk HEC with the content "foobar" with "my-source" set as source and "my-source-type" set as sourcetype and "my-host" set as host
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index 6a25ed7..5c53e31 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -5,6 +5,9 @@ import sys
 import time
 import os
 import re
+import tarfile
+import io
+import tempfile
 
 from .SingleNodeDockerCluster import SingleNodeDockerCluster
 from .utils import retry_check
@@ -135,6 +138,63 @@ class DockerTestCluster(SingleNodeDockerCluster):
         ls_result = output.decode(self.get_stdout_encoding())
         return code == 0 and not ls_result
 
+    @retry_check()
+    def check_splunk_event(self, container_name, query):
+        (code, output) = self.client.containers.get(container_name).exec_run(["sudo", "/opt/splunk/bin/splunk", "search", query, "-auth", "admin:splunkadmin"])
+        if code != 0:
+            return False
+        return query in output.decode("utf-8")
+
+    @retry_check()
+    def check_splunk_event_with_attributes(self, container_name, query, attributes):
+        (code, output) = self.client.containers.get(container_name).exec_run(["sudo", "/opt/splunk/bin/splunk", "search", query, "-output", "json", "-auth", "admin:splunkadmin"])
+        if code != 0:
+            return False
+        result_str = output.decode("utf-8")
+        result_lines = result_str.splitlines()
+        for result_line in result_lines:
+            result_line_json = json.loads(result_line)
+            if "result" not in result_line_json:
+                continue
+            if "host" in attributes:
+                if result_line_json["result"]["host"] != attributes["host"]:
+                    continue
+            if "source" in attributes:
+                if result_line_json["result"]["source"] != attributes["source"]:
+                    continue
+            if "sourcetype" in attributes:
+                if result_line_json["result"]["sourcetype"] != attributes["sourcetype"]:
+                    continue
+            if "index" in attributes:
+                if result_line_json["result"]["index"] != attributes["index"]:
+                    continue
+            return True
+        return False
+
+    def enable_splunk_hec_indexer(self, container_name, hec_name):
+        (code, output) = self.client.containers.get(container_name).exec_run(["sudo",
+                                                                              "/opt/splunk/bin/splunk", "http-event-collector",
+                                                                              "update", hec_name,
+                                                                              "-uri", "https://localhost:8089",
+                                                                              "-use-ack", "1",
+                                                                              "-disabled", "0",
+                                                                              "-auth", "admin:splunkadmin"])
+        return code == 0
+
+    def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem):
+        assert self.write_content_to_container(splunk_cert_pem.decode() + splunk_key_pem.decode() + root_ca_cert_pem.decode(), dst=container_name + ':/opt/splunk/etc/auth/splunk_cert.pem')
+        assert self.write_content_to_container(root_ca_cert_pem.decode(), dst=container_name + ':/opt/splunk/etc/auth/root_ca.pem')
+        (code, output) = self.client.containers.get(container_name).exec_run(["sudo",
+                                                                              "/opt/splunk/bin/splunk", "http-event-collector",
+                                                                              "update",
+                                                                              "-uri", "https://localhost:8089",
+                                                                              "-enable-ssl", "1",
+                                                                              "-server-cert", "/opt/splunk/etc/auth/splunk_cert.pem",
+                                                                              "-ca-cert-file", "/opt/splunk/etc/auth/root_ca.pem",
+                                                                              "-require-client-cert", "1",
+                                                                              "-auth", "admin:splunkadmin"])
+        return code == 0
+
     def query_postgres_server(self, postgresql_container_name, query, number_of_rows):
         (code, output) = self.client.containers.get(postgresql_container_name).exec_run(["psql", "-U", "postgres", "-c", query])
         output = output.decode(self.get_stdout_encoding())
@@ -153,3 +213,14 @@ class DockerTestCluster(SingleNodeDockerCluster):
 
     def wait_for_kafka_consumer_to_be_registered(self, kafka_container_name):
         return self.wait_for_app_logs(kafka_container_name, "Assignment received from leader for group docker_test_group", 60)
+
+    def write_content_to_container(self, content, dst):
+        container_name, dst_path = dst.split(':')
+        container = self.client.containers.get(container_name)
+        with tempfile.TemporaryDirectory() as td:
+            with tarfile.open(os.path.join(td, 'content.tar'), mode='w') as tar:
+                info = tarfile.TarInfo(name=os.path.basename(dst_path))
+                info.size = len(content)
+                tar.addfile(info, io.BytesIO(content.encode('utf-8')))
+            with open(os.path.join(td, 'content.tar'), 'rb') as data:
+                return container.put_archive(os.path.dirname(dst_path), data.read())
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index 70969c8..bfd03c5 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -39,6 +39,8 @@ class ImageStore:
             image = self.__build_kafka_broker_image()
         elif container_engine == "mqtt-broker":
             image = self.__build_mqtt_broker_image()
+        elif container_engine == "splunk":
+            image = self.__build_splunk_image()
         else:
             raise Exception("There is no associated image for " + container_engine)
 
@@ -135,6 +137,9 @@ class ImageStore:
 
         return self.__build_image(dockerfile)
 
+    def __build_splunk_image(self):
+        return self.__build_image_by_path(self.test_dir + "/resources/splunk-hec", 'minifi-splunk')
+
     def __build_image(self, dockerfile, context_files=[]):
         conf_dockerfile_buffer = BytesIO()
         docker_context_buffer = BytesIO()
diff --git a/docker/test/integration/minifi/core/SSL_cert_utils.py b/docker/test/integration/minifi/core/SSL_cert_utils.py
index 81bae9b..04bbcce 100644
--- a/docker/test/integration/minifi/core/SSL_cert_utils.py
+++ b/docker/test/integration/minifi/core/SSL_cert_utils.py
@@ -1,7 +1,9 @@
 import time
 import logging
+import random
 
 from M2Crypto import X509, EVP, RSA, ASN1
+from OpenSSL import crypto
 
 
 def gen_cert():
@@ -55,3 +57,80 @@ def gen_req():
     req.sign(key, 'sha256')
 
     return req, key
+
+
+def make_ca(common_name):
+    ca_key = crypto.PKey()
+    ca_key.generate_key(crypto.TYPE_RSA, 2048)
+
+    ca_cert = crypto.X509()
+    ca_cert.set_version(2)
+    ca_cert.set_serial_number(random.randint(50000000, 100000000))
+
+    ca_subj = ca_cert.get_subject()
+    ca_subj.commonName = common_name
+
+    ca_cert.add_extensions([
+        crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=ca_cert),
+    ])
+
+    ca_cert.add_extensions([
+        crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always", issuer=ca_cert),
+    ])
+
+    ca_cert.add_extensions([
+        crypto.X509Extension(b"basicConstraints", False, b"CA:TRUE"),
+        crypto.X509Extension(b"keyUsage", False, b"keyCertSign, cRLSign"),
+    ])
+
+    ca_cert.set_issuer(ca_subj)
+    ca_cert.set_pubkey(ca_key)
+
+    ca_cert.gmtime_adj_notBefore(0)
+    ca_cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
+
+    ca_cert.sign(ca_key, 'sha256')
+
+    return ca_cert, ca_key
+
+
+def make_cert(common_name, ca_cert, ca_key):
+    key = crypto.PKey()
+    key.generate_key(crypto.TYPE_RSA, 2048)
+
+    cert = crypto.X509()
+    cert.set_version(2)
+    cert.set_serial_number(random.randint(50000000, 100000000))
+
+    client_subj = cert.get_subject()
+    client_subj.commonName = common_name
+
+    cert.add_extensions([
+        crypto.X509Extension(b"basicConstraints", False, b"CA:FALSE"),
+        crypto.X509Extension(b"subjectKeyIdentifier", False, b"hash", subject=cert),
+    ])
+
+    cert.add_extensions([
+        crypto.X509Extension(b"authorityKeyIdentifier", False, b"keyid:always", issuer=ca_cert),
+        crypto.X509Extension(b"extendedKeyUsage", False, b"clientAuth"),
+        crypto.X509Extension(b"extendedKeyUsage", False, b"serverAuth"),
+        crypto.X509Extension(b"keyUsage", False, b"digitalSignature"),
+    ])
+
+    cert.set_issuer(ca_cert.get_subject())
+    cert.set_pubkey(key)
+
+    cert.gmtime_adj_notBefore(0)
+    cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
+
+    cert.sign(ca_key, 'sha256')
+
+    return cert, key
+
+
+def dump_certificate(cert):
+    return crypto.dump_certificate(crypto.FILETYPE_PEM, cert)
+
+
+def dump_privatekey(key):
+    return crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 59e6ba4..acc375c 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -13,6 +13,7 @@ from .HttpProxyContainer import HttpProxyContainer
 from .PostgreSQLServerContainer import PostgreSQLServerContainer
 from .MqttBrokerContainer import MqttBrokerContainer
 from .OPCUAServerContainer import OPCUAServerContainer
+from .SplunkContainer import SplunkContainer
 
 
 class SingleNodeDockerCluster(Cluster):
@@ -83,6 +84,8 @@ class SingleNodeDockerCluster(Cluster):
             return self.containers.setdefault(name, MqttBrokerContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == 'opcua-server':
             return self.containers.setdefault(name, OPCUAServerContainer(name, self.vols, self.network, self.image_store, command))
+        elif engine == 'splunk':
+            return self.containers.setdefault(name, SplunkContainer(name, self.vols, self.network, self.image_store, command))
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
diff --git a/docker/test/integration/minifi/core/SplunkContainer.py b/docker/test/integration/minifi/core/SplunkContainer.py
new file mode 100644
index 0000000..fc1c806
--- /dev/null
+++ b/docker/test/integration/minifi/core/SplunkContainer.py
@@ -0,0 +1,28 @@
+import logging
+from .Container import Container
+
+
+class SplunkContainer(Container):
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'splunk', vols, network, image_store, command)
+
+    def get_startup_finished_log_entry(self):
+        return "Ansible playbook complete, will begin streaming splunkd_stderr.log"
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running Splunk docker container...')
+        self.client.containers.run(
+            self.image_store.get_image(self.get_engine()),
+            detach=True,
+            name=self.name,
+            network=self.network.name,
+            environment=[
+                "SPLUNK_LICENSE_URI=Free",
+                "SPLUNK_START_ARGS=--accept-license",
+                "SPLUNK_PASSWORD=splunkadmin"
+            ],
+            entrypoint=self.command)
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/processors/PutSplunkHTTP.py b/docker/test/integration/minifi/processors/PutSplunkHTTP.py
new file mode 100644
index 0000000..8565cf5
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PutSplunkHTTP.py
@@ -0,0 +1,14 @@
+from ..core.Processor import Processor
+
+
+class PutSplunkHTTP(Processor):
+    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+        super(PutSplunkHTTP, self).__init__(
+            'PutSplunkHTTP',
+            properties={
+                'Hostname': 'splunk',
+                'Port': '8088',
+                'Token': 'Splunk 176fae97-f59d-4f08-939a-aa6a543f2485'
+            },
+            auto_terminate=['success', 'failure'],
+            schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/QuerySplunkIndexingStatus.py b/docker/test/integration/minifi/processors/QuerySplunkIndexingStatus.py
new file mode 100644
index 0000000..5400d26
--- /dev/null
+++ b/docker/test/integration/minifi/processors/QuerySplunkIndexingStatus.py
@@ -0,0 +1,15 @@
+from ..core.Processor import Processor
+
+
+class QuerySplunkIndexingStatus(Processor):
+    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN',
+                                 'penalization period': '1 sec'}):
+        super(QuerySplunkIndexingStatus, self).__init__(
+            'QuerySplunkIndexingStatus',
+            properties={
+                'Hostname': 'splunk',
+                'Port': '8088',
+                'Token': 'Splunk 176fae97-f59d-4f08-939a-aa6a543f2485'
+            },
+            auto_terminate=['acknowledged', 'unacknowledged', 'undetermined', 'failure'],
+            schedule=schedule)
diff --git a/docker/test/integration/resources/splunk-hec/Dockerfile b/docker/test/integration/resources/splunk-hec/Dockerfile
new file mode 100644
index 0000000..549cb9a
--- /dev/null
+++ b/docker/test/integration/resources/splunk-hec/Dockerfile
@@ -0,0 +1,2 @@
+FROM splunk/splunk:latest
+ADD conf/default.yml /tmp/defaults/default.yml
diff --git a/docker/test/integration/resources/splunk-hec/conf/default.yml b/docker/test/integration/resources/splunk-hec/conf/default.yml
new file mode 100644
index 0000000..a59ce2a
--- /dev/null
+++ b/docker/test/integration/resources/splunk-hec/conf/default.yml
@@ -0,0 +1,6 @@
+splunk:
+  hec:
+    enable: True
+    ssl: False
+    port: 8088
+    token: 176fae97-f59d-4f08-939a-aa6a543f2485
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 92d3ebe..183e6eb 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -1,6 +1,6 @@
 from minifi.core.FileSystemObserver import FileSystemObserver
 from minifi.core.RemoteProcessGroup import RemoteProcessGroup
-from minifi.core.SSL_cert_utils import gen_cert, rsa_gen_key_callback
+from minifi.core.SSL_cert_utils import gen_cert, rsa_gen_key_callback, make_ca, make_cert, dump_certificate, dump_privatekey
 from minifi.core.Funnel import Funnel
 
 from minifi.controllers.SSLContextService import SSLContextService
@@ -94,6 +94,7 @@ def step_impl(context, processor_type, minifi_container_name):
 @given("a {processor_type} processor set up to communicate with an Azure blob storage")
 @given("a {processor_type} processor set up to communicate with a kafka broker instance")
 @given("a {processor_type} processor set up to communicate with an MQTT broker instance")
+@given("a {processor_type} processor set up to communicate with the Splunk HEC instance")
 def step_impl(context, processor_type):
     context.execute_steps("given a {processor_type} processor in the \"{minifi_container_name}\" flow".format(processor_type=processor_type, minifi_container_name="minifi-cpp-flow"))
 
@@ -144,6 +145,13 @@ def step_impl(context, property_name, processor_name, property_value):
         processor.set_property(property_name, property_value)
 
 
+@given("the \"{property_name}\" properties of the {processor_name_one} and {processor_name_two} processors are set to the same random guid")
+def step_impl(context, property_name, processor_name_one, processor_name_two):
+    uuid_str = str(uuid.uuid4())
+    context.test.get_node_by_name(processor_name_one).set_property(property_name, uuid_str)
+    context.test.get_node_by_name(processor_name_two).set_property(property_name, uuid_str)
+
+
 @given("the \"{property_name}\" property of the {processor_name} processor is set to match {key_attribute_encoding} encoded kafka message key \"{message_key}\"")
 def step_impl(context, property_name, processor_name, key_attribute_encoding, message_key):
     encoded_key = ""
@@ -323,6 +331,34 @@ def step_impl(context):
     context.test.acquire_container("azure-storage-server", "azure-storage-server")
 
 
+# splunk hec
+@given("a Splunk HEC is set up and running")
+def step_impl(context):
+    context.test.start_splunk()
+
+
+@given("SSL is enabled for the Splunk HEC and the SSL context service is set up for PutSplunkHTTP and QuerySplunkIndexingStatus")
+def step_impl(context):
+    root_ca_cert, root_ca_key = make_ca("root CA")
+    minifi_cert, minifi_key = make_cert("minifi-cpp-flow", root_ca_cert, root_ca_key)
+    splunk_cert, splunk_key = make_cert("splunk", root_ca_cert, root_ca_key)
+    minifi_crt_file = '/tmp/resources/minifi-cpp-flow.pem'
+    minifi_key_file = '/tmp/resources/minifi-cpp-flow.key'
+    root_ca_crt_file = '/tmp/resources/root_ca.pem'
+    ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)
+    context.test.put_test_resource('minifi-cpp-flow.pem', dump_certificate(minifi_cert))
+    context.test.put_test_resource('minifi-cpp-flow.key', dump_privatekey(minifi_key))
+    context.test.put_test_resource('root_ca.pem', dump_certificate(root_ca_cert))
+
+    put_splunk_http = context.test.get_node_by_name("PutSplunkHTTP")
+    put_splunk_http.controller_services.append(ssl_context_service)
+    put_splunk_http.set_property("SSL Context Service", ssl_context_service.name)
+    query_splunk_indexing_status = context.test.get_node_by_name("QuerySplunkIndexingStatus")
+    query_splunk_indexing_status.controller_services.append(ssl_context_service)
+    query_splunk_indexing_status.set_property("SSL Context Service", ssl_context_service.name)
+    context.test.cluster.enable_splunk_hec_ssl('splunk', dump_certificate(splunk_cert), dump_privatekey(splunk_key), dump_certificate(root_ca_cert))
+
+
 @given("the kafka broker is started")
 def step_impl(context):
     context.test.start_kafka_broker()
@@ -599,3 +635,15 @@ def step_impl(context, log_pattern):
 def step_impl(context):
     context.test.acquire_container("mqtt-broker", "mqtt-broker")
     context.test.start()
+
+
+# Splunk
+@then('an event is registered in Splunk HEC with the content \"{content}\"')
+def step_imp(context, content):
+    context.test.check_splunk_event("splunk", content)
+
+
+@then('an event is registered in Splunk HEC with the content \"{content}\" with \"{source}\" set as source and \"{source_type}\" set as sourcetype and \"{host}\" set as host')
+def step_imp(context, content, source, source_type, host):
+    attr = {"source": source, "sourcetype": source_type, "host": host}
+    context.test.check_splunk_event_with_attributes("splunk", content, attr)
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index 166b67c..ae2da1a 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -212,7 +212,10 @@ void HTTPClient::setContentType(std::string content_type) {
 }
 
 std::string HTTPClient::escape(std::string string_to_escape) {
-  return curl_easy_escape(http_session_, string_to_escape.c_str(), gsl::narrow<int>(string_to_escape.length()));
+  struct curl_deleter { void operator()(void* p) noexcept { curl_free(p); } };
+  std::unique_ptr<char, curl_deleter> escaped_chars{curl_easy_escape(http_session_, string_to_escape.c_str(), gsl::narrow<int>(string_to_escape.length()))};
+  std::string escaped_string(escaped_chars.get());
+  return escaped_string;
 }
 
 void HTTPClient::setPostFields(const std::string& input) {
diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp
index 5b3d881..79dc785 100644
--- a/extensions/http-curl/processors/InvokeHTTP.cpp
+++ b/extensions/http-curl/processors/InvokeHTTP.cpp
@@ -171,13 +171,6 @@ void InvokeHTTP::initialize() {
   setSupportedRelationships({Success, RelResponse, RelFailure, RelRetry, RelNoRetry});
 }
 
-bool getTimeMSFromString(const std::string& propertyName, uint64_t& valInt) {
-  core::TimeUnit unit;
-  return !propertyName.empty()
-      && core::Property::StringToTime(propertyName, valInt, unit)
-      && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt);
-}
-
 void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
   if (!context->getProperty(Method.getName(), method_)) {
     logger_->log_debug("%s attribute is missing, so default value of %s will be used", Method.getName(), Method.getValue());
diff --git a/extensions/http-curl/tests/unit/HTTPClientTests.cpp b/extensions/http-curl/tests/unit/HTTPClientTests.cpp
index 8e9dc12..826d9d1 100644
--- a/extensions/http-curl/tests/unit/HTTPClientTests.cpp
+++ b/extensions/http-curl/tests/unit/HTTPClientTests.cpp
@@ -88,3 +88,9 @@ TEST_CASE("HTTPClientTestChunkedResponse", "[basic]") {
 
   LogTestController::getInstance().reset();
 }
+
+TEST_CASE("HTTPClient escape test") {
+  utils::HTTPClient client;
+  CHECK(client.escape("Hello Günter") == "Hello%20G%C3%BCnter");
+  CHECK(client.escape("шеллы") == "%D1%88%D0%B5%D0%BB%D0%BB%D1%8B");
+}
diff --git a/extensions/splunk/CMakeLists.txt b/extensions/splunk/CMakeLists.txt
new file mode 100644
index 0000000..4b489e0
--- /dev/null
+++ b/extensions/splunk/CMakeLists.txt
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+file(GLOB SOURCES  "*.cpp")
+
+add_library(minifi-splunk SHARED ${SOURCES})
+
+target_include_directories(minifi-splunk PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/http-curl")
+target_link_libraries(minifi-splunk ${LIBMINIFI})
+target_link_libraries(minifi-splunk minifi-http-curl)
+
+SET(SPLUNK-EXTENSION minifi-splunk PARENT_SCOPE)
+register_extension(minifi-splunk)
+
+register_extension_linter(minifi-splunk-extensions-linter)
diff --git a/extensions/splunk/PutSplunkHTTP.cpp b/extensions/splunk/PutSplunkHTTP.cpp
new file mode 100644
index 0000000..85e550a
--- /dev/null
+++ b/extensions/splunk/PutSplunkHTTP.cpp
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PutSplunkHTTP.h"
+
+#include <vector>
+#include <utility>
+
+#include "SplunkAttributes.h"
+
+#include "core/Resource.h"
+#include "utils/StringUtils.h"
+#include "client/HTTPClient.h"
+#include "utils/HTTPClient.h"
+#include "utils/OptionalUtils.h"
+
+#include "rapidjson/document.h"
+
+
+namespace org::apache::nifi::minifi::extensions::splunk {
+
+const core::Property PutSplunkHTTP::Source(core::PropertyBuilder::createProperty("Source")
+    ->withDescription("Basic field describing the source of the event. If unspecified, the event will use the default defined in splunk.")
+    ->supportsExpressionLanguage(true)->build());
+
+const core::Property PutSplunkHTTP::SourceType(core::PropertyBuilder::createProperty("Source Type")
+    ->withDescription("Basic field describing the source type of the event. If unspecified, the event will use the default defined in splunk.")
+    ->supportsExpressionLanguage(true)->build());
+
+const core::Property PutSplunkHTTP::Host(core::PropertyBuilder::createProperty("Host")
+    ->withDescription("Basic field describing the host of the event. If unspecified, the event will use the default defined in splunk.")
+    ->supportsExpressionLanguage(true)->build());
+
+const core::Property PutSplunkHTTP::Index(core::PropertyBuilder::createProperty("Index")
+    ->withDescription("Identifies the index where to send the event. If unspecified, the event will use the default defined in splunk.")
+    ->supportsExpressionLanguage(true)->build());
+
+const core::Property PutSplunkHTTP::ContentType(core::PropertyBuilder::createProperty("Content Type")
+    ->withDescription("The media type of the event sent to Splunk. If not set, \"mime.type\" flow file attribute will be used. "
+                      "In case of neither of them is specified, this information will not be sent to the server.")
+    ->supportsExpressionLanguage(true)->build());
+
+
+const core::Relationship PutSplunkHTTP::Success("success", "FlowFiles that are sent successfully to the destination are sent to this relationship.");
+const core::Relationship PutSplunkHTTP::Failure("failure", "FlowFiles that failed to be sent to the destination are sent to this relationship.");
+
+void PutSplunkHTTP::initialize() {
+  setSupportedRelationships({Success, Failure});
+  setSupportedProperties({Hostname, Port, Token, SplunkRequestChannel, SSLContext, Source, SourceType, Host, Index, ContentType});
+}
+
+void PutSplunkHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  SplunkHECProcessor::onSchedule(context, sessionFactory);
+}
+
+
+namespace {
+std::optional<std::string> getContentType(core::ProcessContext& context, const core::FlowFile& flow_file) {
+  return context.getProperty(PutSplunkHTTP::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");});
+}
+
+
+std::string getEndpoint(core::ProcessContext& context, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file, utils::HTTPClient& client) {
+  std::stringstream endpoint;
+  endpoint << "/services/collector/raw";
+  std::vector<std::string> parameters;
+  std::string prop_value;
+  if (context.getProperty(PutSplunkHTTP::SourceType, prop_value, flow_file)) {
+    parameters.push_back("sourcetype=" + client.escape(prop_value));
+  }
+  if (context.getProperty(PutSplunkHTTP::Source, prop_value, flow_file)) {
+    parameters.push_back("source=" + client.escape(prop_value));
+  }
+  if (context.getProperty(PutSplunkHTTP::Host, prop_value, flow_file)) {
+    parameters.push_back("host=" + client.escape(prop_value));
+  }
+  if (context.getProperty(PutSplunkHTTP::Index, prop_value, flow_file)) {
+    parameters.push_back("index=" + client.escape(prop_value));
+  }
+  if (!parameters.empty()) {
+    endpoint << "?" << utils::StringUtils::join("&", parameters);
+  }
+  return endpoint.str();
+}
+
+bool setAttributesFromClientResponse(core::FlowFile& flow_file, utils::HTTPClient& client) {
+  rapidjson::Document response_json;
+  rapidjson::ParseResult parse_result = response_json.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  bool result = true;
+  if (parse_result.IsError())
+    return false;
+
+  if (response_json.HasMember("code") && response_json["code"].IsInt())
+    flow_file.setAttribute(SPLUNK_RESPONSE_CODE, std::to_string(response_json["code"].GetInt()));
+  else
+    result = false;
+
+  if (response_json.HasMember("ackId") && response_json["ackId"].IsUint64())
+    flow_file.setAttribute(SPLUNK_ACK_ID, std::to_string(response_json["ackId"].GetUint64()));
+  else
+    result = false;
+
+  return result;
+}
+
+bool enrichFlowFileWithAttributes(core::FlowFile& flow_file, utils::HTTPClient& client) {
+  flow_file.setAttribute(SPLUNK_STATUS_CODE, std::to_string(client.getResponseCode()));
+  flow_file.setAttribute(SPLUNK_RESPONSE_TIME, std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count()));
+
+  return setAttributesFromClientResponse(flow_file, client) && client.getResponseCode() == 200;
+}
+
+void setFlowFileAsPayload(core::ProcessSession& session,
+                          core::ProcessContext& context,
+                          utils::HTTPClient& client,
+                          const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file,
+                          utils::ByteInputCallBack& payload_callback,
+                          utils::HTTPUploadCallback& payload_callback_obj) {
+  session.read(flow_file, &payload_callback);
+  payload_callback_obj.ptr = &payload_callback;
+  payload_callback_obj.pos = 0;
+  client.appendHeader("Content-Length", std::to_string(flow_file->getSize()));
+
+  client.setUploadCallback(&payload_callback_obj);
+  client.setSeekFunction(&payload_callback_obj);
+
+  if (auto content_type = getContentType(context, *flow_file)) {
+    client.setContentType(content_type.value());
+  }
+}
+}  // namespace
+
+void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session);
+
+  auto ff = session->get();
+  if (!ff) {
+    context->yield();
+    return;
+  }
+  auto flow_file = gsl::not_null(std::move(ff));
+
+  utils::HTTPClient client;
+  initializeClient(client, getNetworkLocation().append(getEndpoint(*context, flow_file, client)), getSSLContextService(*context));
+
+  utils::ByteInputCallBack payload_callback;
+  utils::HTTPUploadCallback payload_callback_obj;
+  setFlowFileAsPayload(*session, *context, client, flow_file, payload_callback, payload_callback_obj);
+
+  bool success = false;
+  if (client.submit())
+    success = enrichFlowFileWithAttributes(*flow_file, client);
+
+  session->transfer(flow_file, success ? Success : Failure);
+}
+
+
+REGISTER_RESOURCE(PutSplunkHTTP, "Sends the flow file contents to the specified Splunk HTTP Event Collector over HTTP or HTTPS. Supports HEC Index Acknowledgement.");
+
+}  // namespace org::apache::nifi::minifi::extensions::splunk
+
diff --git a/extensions/splunk/PutSplunkHTTP.h b/extensions/splunk/PutSplunkHTTP.h
new file mode 100644
index 0000000..9e8360c
--- /dev/null
+++ b/extensions/splunk/PutSplunkHTTP.h
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "SplunkHECProcessor.h"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::extensions::splunk {
+
+class PutSplunkHTTP final : public SplunkHECProcessor {
+ public:
+  explicit PutSplunkHTTP(const std::string& name, const utils::Identifier& uuid = {})
+      : SplunkHECProcessor(name, uuid) {
+  }
+  PutSplunkHTTP(const PutSplunkHTTP&) = delete;
+  PutSplunkHTTP(PutSplunkHTTP&&) = delete;
+  PutSplunkHTTP& operator=(const PutSplunkHTTP&) = delete;
+  PutSplunkHTTP& operator=(PutSplunkHTTP&&) = delete;
+  ~PutSplunkHTTP() override = default;
+
+  EXTENSIONAPI static const core::Property Source;
+  EXTENSIONAPI static const core::Property SourceType;
+  EXTENSIONAPI static const core::Property Host;
+  EXTENSIONAPI static const core::Property Index;
+  EXTENSIONAPI static const core::Property ContentType;
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+  bool isSingleThreaded() const override {
+    return false;
+  }
+};
+
+}  // namespace org::apache::nifi::minifi::extensions::splunk
+
diff --git a/extensions/splunk/QuerySplunkIndexingStatus.cpp b/extensions/splunk/QuerySplunkIndexingStatus.cpp
new file mode 100644
index 0000000..bd7e35a
--- /dev/null
+++ b/extensions/splunk/QuerySplunkIndexingStatus.cpp
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "QuerySplunkIndexingStatus.h"
+
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+
+#include "SplunkAttributes.h"
+
+#include "core/Resource.h"
+#include "client/HTTPClient.h"
+#include "utils/HTTPClient.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+
+namespace org::apache::nifi::minifi::extensions::splunk {
+
+const core::Property QuerySplunkIndexingStatus::MaximumWaitingTime(core::PropertyBuilder::createProperty("Maximum Waiting Time")
+    ->withDescription("The maximum time the processor tries to acquire acknowledgement confirmation for an index, from the point of registration. "
+                      "After the given amount of time, the processor considers the index as not acknowledged and transfers the FlowFile to the \"unacknowledged\" relationship.")
+    ->withDefaultValue<core::TimePeriodValue>("1 hour")->isRequired(true)->build());
+
+const core::Property QuerySplunkIndexingStatus::MaxQuerySize(core::PropertyBuilder::createProperty("Maximum Query Size")
+    ->withDescription("The maximum number of acknowledgement identifiers the outgoing query contains in one batch. "
+                      "It is recommended not to set it too low in order to reduce network communication.")
+    ->withDefaultValue<uint64_t>(1000)->isRequired(true)->build());
+
+const core::Relationship QuerySplunkIndexingStatus::Acknowledged("acknowledged",
+    "A FlowFile is transferred to this relationship when the acknowledgement was successful.");
+
+const core::Relationship QuerySplunkIndexingStatus::Unacknowledged("unacknowledged",
+    "A FlowFile is transferred to this relationship when the acknowledgement was not successful. "
+    "This can happen when the acknowledgement did not happened within the time period set for Maximum Waiting Time. "
+    "FlowFiles with acknowledgement id unknown for the Splunk server will be transferred to this relationship after the Maximum Waiting Time is reached.");
+
+const core::Relationship QuerySplunkIndexingStatus::Undetermined("undetermined",
+    "A FlowFile is transferred to this relationship when the acknowledgement state is not determined. "
+    "FlowFiles transferred to this relationship might be penalized. "
+    "This happens when Splunk returns with HTTP 200 but with false response for the acknowledgement id in the flow file attribute.");
+
+const core::Relationship QuerySplunkIndexingStatus::Failure("failure",
+    "A FlowFile is transferred to this relationship when the acknowledgement was not successful due to errors during the communication, "
+    "or if the flowfile was missing the acknowledgement id");
+
+void QuerySplunkIndexingStatus::initialize() {
+  setSupportedRelationships({Acknowledged, Unacknowledged, Undetermined, Failure});
+  setSupportedProperties({Hostname, Port, Token, SplunkRequestChannel, SSLContext, MaximumWaitingTime, MaxQuerySize});
+}
+
+void QuerySplunkIndexingStatus::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  gsl_Expects(context && sessionFactory);
+  SplunkHECProcessor::onSchedule(context, sessionFactory);
+  std::string max_wait_time_str;
+  if (context->getProperty(MaximumWaitingTime.getName(), max_wait_time_str)) {
+    core::TimeUnit unit;
+    uint64_t max_wait_time;
+    if (core::Property::StringToTime(max_wait_time_str, max_wait_time, unit) && core::Property::ConvertTimeUnitToMS(max_wait_time, unit, max_wait_time)) {
+      max_age_ = std::chrono::milliseconds(max_wait_time);
+    }
+  }
+
+  context->getProperty(MaxQuerySize.getName(), batch_size_);
+}
+
+namespace {
+constexpr std::string_view getEndpoint() {
+  return "/services/collector/ack";
+}
+
+struct FlowFileWithIndexStatus {
+  explicit FlowFileWithIndexStatus(gsl::not_null<std::shared_ptr<core::FlowFile>>&& flow_file) : flow_file_(std::move(flow_file)) {}
+
+  gsl::not_null<std::shared_ptr<core::FlowFile>> flow_file_;
+  std::optional<bool> indexing_status_ = std::nullopt;
+};
+
+std::unordered_map<uint64_t, FlowFileWithIndexStatus> getUndeterminedFlowFiles(core::ProcessSession& session, size_t batch_size) {
+  std::unordered_map<uint64_t, FlowFileWithIndexStatus> undetermined_flow_files;
+  std::unordered_set<uint64_t> duplicate_ack_ids;
+  for (size_t i = 0; i < batch_size; ++i) {
+    auto flow = session.get();
+    if (flow == nullptr)
+      break;
+    std::optional<std::string> splunk_ack_id_str = flow->getAttribute(SPLUNK_ACK_ID);
+    if (!splunk_ack_id_str.has_value()) {
+      session.transfer(flow, QuerySplunkIndexingStatus::Failure);
+      continue;
+    }
+    uint64_t splunk_ack_id = std::stoull(splunk_ack_id_str.value());
+    if (undetermined_flow_files.contains(splunk_ack_id)) {
+      duplicate_ack_ids.insert(splunk_ack_id);
+      session.transfer(flow, QuerySplunkIndexingStatus::Failure);
+      continue;
+    }
+    undetermined_flow_files.emplace(std::make_pair(splunk_ack_id, gsl::not_null(std::move(flow))));
+  }
+  for (auto duplicate_ack_id : duplicate_ack_ids) {
+    session.transfer(undetermined_flow_files.at(duplicate_ack_id).flow_file_, QuerySplunkIndexingStatus::Failure);
+    undetermined_flow_files.erase(duplicate_ack_id);
+  }
+  return undetermined_flow_files;
+}
+
+std::string getAckIdsAsPayload(const std::unordered_map<uint64_t, FlowFileWithIndexStatus>& undetermined_flow_files) {
+  rapidjson::Document payload = rapidjson::Document(rapidjson::kObjectType);
+  payload.AddMember("acks", rapidjson::kArrayType, payload.GetAllocator());
+  for (const auto& [ack_id, ff_status] : undetermined_flow_files) {
+    payload["acks"].PushBack(ack_id, payload.GetAllocator());
+  }
+  rapidjson::StringBuffer buffer;
+  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+  payload.Accept(writer);
+  return buffer.GetString();
+}
+
+void getIndexingStatusFromSplunk(utils::HTTPClient& client, std::unordered_map<uint64_t, FlowFileWithIndexStatus>& undetermined_flow_files) {
+  rapidjson::Document response;
+  if (!client.submit())
+    return;
+  if (client.getResponseCode() != 200)
+    return;
+  response = rapidjson::Document();
+  rapidjson::ParseResult parse_result = response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError() || !response.HasMember("acks"))
+    return;
+
+  rapidjson::Value& acks = response["acks"];
+  for (auto& [ack_id, ff_status]: undetermined_flow_files) {
+    if (acks.HasMember(std::to_string(ack_id).c_str()) && acks[std::to_string(ack_id).c_str()].IsBool())
+      ff_status.indexing_status_ = acks[std::to_string(ack_id).c_str()].GetBool();
+  }
+}
+
+bool flowFileAcknowledgementTimedOut(const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file, std::chrono::milliseconds max_age) {
+  using std::chrono::system_clock;
+  using std::chrono::milliseconds;
+  std::optional<std::string> splunk_response_time_str = flow_file->getAttribute(SPLUNK_RESPONSE_TIME);
+  if (!splunk_response_time_str.has_value())
+    return true;
+  uint64_t splunk_response_time = std::stoull(splunk_response_time_str.value());
+  if (system_clock::now() > std::chrono::system_clock::time_point() + std::chrono::milliseconds(splunk_response_time) + max_age)
+    return true;
+  return false;
+}
+
+void routeFlowFilesBasedOnIndexingStatus(core::ProcessSession& session,
+                                         const std::unordered_map<uint64_t, FlowFileWithIndexStatus>& flow_files_with_index_statuses,
+                                         std::chrono::milliseconds max_age) {
+  for (const auto& [ack_id, ff_status] : flow_files_with_index_statuses) {
+    if (!ff_status.indexing_status_.has_value()) {
+      session.transfer(ff_status.flow_file_, QuerySplunkIndexingStatus::Failure);
+    } else {
+      if (ff_status.indexing_status_.value()) {
+        session.transfer(ff_status.flow_file_, QuerySplunkIndexingStatus::Acknowledged);
+      } else if (flowFileAcknowledgementTimedOut(ff_status.flow_file_, max_age)) {
+        session.transfer(ff_status.flow_file_, QuerySplunkIndexingStatus::Unacknowledged);
+      } else {
+        session.penalize(ff_status.flow_file_);
+        session.transfer(ff_status.flow_file_, QuerySplunkIndexingStatus::Undetermined);
+      }
+    }
+  }
+}
+}  // namespace
+
+void QuerySplunkIndexingStatus::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session);
+  std::string ack_request;
+
+  utils::HTTPClient client;
+  initializeClient(client, getNetworkLocation().append(getEndpoint()), getSSLContextService(*context));
+  auto undetermined_flow_files = getUndeterminedFlowFiles(*session, batch_size_);
+  if (undetermined_flow_files.empty())
+    return;
+  client.setPostFields(getAckIdsAsPayload(undetermined_flow_files));
+  getIndexingStatusFromSplunk(client, undetermined_flow_files);
+  routeFlowFilesBasedOnIndexingStatus(*session, undetermined_flow_files, max_age_);
+}
+
+
+REGISTER_RESOURCE(QuerySplunkIndexingStatus, "Queries Splunk server in order to acquire the status of indexing acknowledgement.");
+
+}  // namespace org::apache::nifi::minifi::extensions::splunk
diff --git a/extensions/splunk/QuerySplunkIndexingStatus.h b/extensions/splunk/QuerySplunkIndexingStatus.h
new file mode 100644
index 0000000..7c9b3af
--- /dev/null
+++ b/extensions/splunk/QuerySplunkIndexingStatus.h
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <string>
+#include <memory>
+
+#include "SplunkHECProcessor.h"
+#include "utils/gsl.h"
+#include "rapidjson/stringbuffer.h"
+
+namespace org::apache::nifi::minifi::extensions::splunk {
+
+class QuerySplunkIndexingStatus final : public SplunkHECProcessor {
+ public:
+  explicit QuerySplunkIndexingStatus(const std::string& name, const utils::Identifier& uuid = {})
+      : SplunkHECProcessor(name, uuid) {
+  }
+  QuerySplunkIndexingStatus(const QuerySplunkIndexingStatus&) = delete;
+  QuerySplunkIndexingStatus(QuerySplunkIndexingStatus&&) = delete;
+  QuerySplunkIndexingStatus& operator=(const QuerySplunkIndexingStatus&) = delete;
+  QuerySplunkIndexingStatus& operator=(QuerySplunkIndexingStatus&&) = delete;
+  ~QuerySplunkIndexingStatus() override = default;
+
+  EXTENSIONAPI static const core::Property MaximumWaitingTime;
+  EXTENSIONAPI static const core::Property MaxQuerySize;
+
+  EXTENSIONAPI static const core::Relationship Acknowledged;
+  EXTENSIONAPI static const core::Relationship Unacknowledged;
+  EXTENSIONAPI static const core::Relationship Undetermined;
+  EXTENSIONAPI static const core::Relationship Failure;
+
+  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+  bool isSingleThreaded() const override {
+    return true;
+  }
+
+ protected:
+  uint32_t batch_size_ = 1000;
+  std::chrono::milliseconds max_age_ = std::chrono::hours(1);
+};
+
+}  // namespace org::apache::nifi::minifi::extensions::splunk
diff --git a/extensions/splunk/SplunkAttributes.h b/extensions/splunk/SplunkAttributes.h
new file mode 100644
index 0000000..3aa000b
--- /dev/null
+++ b/extensions/splunk/SplunkAttributes.h
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+namespace org::apache::nifi::minifi::extensions::splunk {
+
+constexpr const char* SPLUNK_ACK_ID = "splunk.acknowledgement.id";
+constexpr const char* SPLUNK_RESPONSE_TIME = "splunk.responded.at";
+constexpr const char* SPLUNK_STATUS_CODE = "splunk.status.code";
+constexpr const char* SPLUNK_RESPONSE_CODE = "splunk.response.code";
+
+}  // namespace org::apache::nifi::minifi::extensions::splunk
diff --git a/extensions/splunk/SplunkHECProcessor.cpp b/extensions/splunk/SplunkHECProcessor.cpp
new file mode 100644
index 0000000..4e315d0
--- /dev/null
+++ b/extensions/splunk/SplunkHECProcessor.cpp
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SplunkHECProcessor.h"
+#include "client/HTTPClient.h"
+#include "utils/HTTPClient.h"
+
+namespace org::apache::nifi::minifi::extensions::splunk {
+
+const core::Property SplunkHECProcessor::Hostname(core::PropertyBuilder::createProperty("Hostname")
+    ->withDescription("The ip address or hostname of the Splunk server.")
+    ->isRequired(true)->build());
+
+const core::Property SplunkHECProcessor::Port(core::PropertyBuilder::createProperty("Port")
+    ->withDescription("The HTTP Event Collector HTTP Port Number.")
+    ->withDefaultValue<int>(8088, core::StandardValidators::get().PORT_VALIDATOR)->isRequired(true)->build());
+
+const core::Property SplunkHECProcessor::Token(core::PropertyBuilder::createProperty("Token")
+    ->withDescription("HTTP Event Collector token starting with the string Splunk. For example \'Splunk 1234578-abcd-1234-abcd-1234abcd\'")
+    ->isRequired(true)->build());
+
+const core::Property SplunkHECProcessor::SplunkRequestChannel(core::PropertyBuilder::createProperty("Splunk Request Channel")
+    ->withDescription("Identifier of the used request channel.")->isRequired(true)->build());
+
+const core::Property SplunkHECProcessor::SSLContext(core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)->withExclusiveProperty("Hostname", "^http:.*$")
+    ->asType<minifi::controllers::SSLContextService>()->build());
+
+void SplunkHECProcessor::initialize() {
+  setSupportedProperties({Hostname, Port, Token, SplunkRequestChannel});
+}
+
+void SplunkHECProcessor::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+  if (!context->getProperty(Hostname.getName(), hostname_))
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to get Hostname");
+
+  if (!context->getProperty(Port.getName(), port_))
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to get Port");
+
+  if (!context->getProperty(Token.getName(), token_))
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to get Token");
+
+  if (!context->getProperty(SplunkRequestChannel.getName(), request_channel_))
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Failed to get SplunkRequestChannel");
+}
+
+std::string SplunkHECProcessor::getNetworkLocation() const {
+  return hostname_ + ":" + port_;
+}
+
+std::shared_ptr<minifi::controllers::SSLContextService> SplunkHECProcessor::getSSLContextService(core::ProcessContext& context) const {
+  std::string context_name;
+  if (context.getProperty(SSLContext.getName(), context_name) && !IsNullOrEmpty(context_name))
+    return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(context_name));
+  return nullptr;
+}
+
+void SplunkHECProcessor::initializeClient(utils::HTTPClient& client, const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) const {
+  client.initialize("POST", url, ssl_context_service);
+  client.appendHeader("Authorization", token_);
+  client.appendHeader("X-Splunk-Request-Channel", request_channel_);
+}
+
+}  // namespace org::apache::nifi::minifi::extensions::splunk
diff --git a/extensions/splunk/SplunkHECProcessor.h b/extensions/splunk/SplunkHECProcessor.h
new file mode 100644
index 0000000..f931b26
--- /dev/null
+++ b/extensions/splunk/SplunkHECProcessor.h
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <string>
+#include <memory>
+
+#include "core/Processor.h"
+
+
+namespace org::apache::nifi::minifi::utils {
+class HTTPClient;
+}
+
+namespace org::apache::nifi::minifi::extensions::splunk {
+
+class SplunkHECProcessor : public core::Processor {
+ public:
+  EXTENSIONAPI static const core::Property Hostname;
+  EXTENSIONAPI static const core::Property Port;
+  EXTENSIONAPI static const core::Property Token;
+  EXTENSIONAPI static const core::Property SplunkRequestChannel;
+  EXTENSIONAPI static const core::Property SSLContext;
+
+  explicit SplunkHECProcessor(const std::string& name, const utils::Identifier& uuid = {})
+      : Processor(name, uuid) {
+  }
+  ~SplunkHECProcessor() override = default;
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+
+  core::annotation::Input getInputRequirement() const override {
+    return core::annotation::Input::INPUT_REQUIRED;
+  }
+
+ protected:
+  std::string getNetworkLocation() const;
+  std::shared_ptr<minifi::controllers::SSLContextService> getSSLContextService(core::ProcessContext& context) const;
+  void initializeClient(utils::HTTPClient& client, const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service) const;
+
+  std::string token_;
+  std::string hostname_;
+  std::string port_;
+  std::string request_channel_;
+};
+}  // namespace org::apache::nifi::minifi::extensions::splunk
diff --git a/extensions/splunk/tests/CMakeLists.txt b/extensions/splunk/tests/CMakeLists.txt
new file mode 100644
index 0000000..1ca2257
--- /dev/null
+++ b/extensions/splunk/tests/CMakeLists.txt
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB SPLUNK_TESTS  "*.cpp")
+
+SET(SPLUNK_TEST_COUNT 0)
+FOREACH(testfile ${SPLUNK_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${testfile}")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/splunk")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/civetweb/")
+    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/")
+
+    createTests("${testfilename}")
+    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+    target_link_libraries(${testfilename} minifi-splunk)
+    target_link_libraries(${testfilename} minifi-civet-extensions)
+    target_link_libraries(${testfilename} minifi-http-curl)
+    target_link_libraries(${testfilename} minifi-standard-processors)
+    MATH(EXPR SPLUNK_TEST_COUNT "${SPLUNK_TEST_COUNT}+1")
+    add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${SPLUNK_TEST_COUNT} Splunk related test file(s)...")
diff --git a/extensions/splunk/tests/MockSplunkHEC.h b/extensions/splunk/tests/MockSplunkHEC.h
new file mode 100644
index 0000000..0b6351b
--- /dev/null
+++ b/extensions/splunk/tests/MockSplunkHEC.h
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+#include <CivetServer.h>
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rapidjson/document.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
+
+
+class MockSplunkHandler : public CivetHandler {
+ public:
+  explicit MockSplunkHandler(std::string token, std::function<void(const struct mg_request_info *request_info)>& assertions) : token_(std::move(token)), assertions_(assertions) {
+  }
+
+  enum HeaderResult {
+    MissingAuth,
+    InvalidAuth,
+    MissingReqChannel,
+    HeadersOk
+  };
+
+  bool handlePost(CivetServer*, struct mg_connection *conn) override {
+    switch (checkHeaders(conn)) {
+      case MissingAuth:
+        return send401(conn);
+      case InvalidAuth:
+        return send403(conn);
+      case MissingReqChannel:
+        return send400(conn);
+      case HeadersOk:
+        return handlePostImpl(conn);
+    }
+    return false;
+  }
+
+  HeaderResult checkHeaders(struct mg_connection *conn) const {
+    const struct mg_request_info* req_info = mg_get_request_info(conn);
+    assertions_(req_info);
+    auto auth_header = std::find_if(std::begin(req_info->http_headers),
+                                    std::end(req_info->http_headers),
+                                    [](auto header) -> bool {return strcmp(header.name, "Authorization") == 0;});
+    if (auth_header == std::end(req_info->http_headers))
+      return MissingAuth;
+    if (strcmp(auth_header->value, token_.c_str()) != 0)
+      return InvalidAuth;
+
+    auto request_channel_header = std::find_if(std::begin(req_info->http_headers),
+                                               std::end(req_info->http_headers),
+                                               [](auto header) -> bool {return strcmp(header.name, "X-Splunk-Request-Channel") == 0;});
+
+    if (request_channel_header == std::end(req_info->http_headers))
+      return MissingReqChannel;
+    return HeadersOk;
+  }
+
+  bool send400(struct mg_connection *conn) const {
+    constexpr const char * body = "{\"text\":\"Data channel is missing\",\"code\":10}";
+    mg_printf(conn, "HTTP/1.1 400 Bad Request\r\n");
+    mg_printf(conn, "Content-length: %lu", strlen(body));
+    mg_printf(conn, "\r\n\r\n");
+    mg_printf(conn, body);
+    return true;
+  }
+
+  bool send401(struct mg_connection *conn) const {
+    constexpr const char * body = "{\"text\":\"Token is required\",\"code\":2}";
+    mg_printf(conn, "HTTP/1.1 401 Unauthorized\r\n");
+    mg_printf(conn, "Content-length: %lu", strlen(body));
+    mg_printf(conn, "\r\n\r\n");
+    mg_printf(conn, body);
+    return true;
+  }
+
+  bool send403(struct mg_connection *conn) const {
+    constexpr const char * body = "{\"text\":\"Invalid token\",\"code\":4}";
+    mg_printf(conn, "HTTP/1.1 403 Forbidden\r\n");
+    mg_printf(conn, "Content-length: %lu", strlen(body));
+    mg_printf(conn, "\r\n\r\n");
+    mg_printf(conn, body);
+    return true;
+  }
+
+ protected:
+  virtual bool handlePostImpl(struct mg_connection *conn) = 0;
+  std::string token_;
+  std::function<void(const struct mg_request_info *request_info)>& assertions_;
+};
+
+class RawCollectorHandler : public MockSplunkHandler {
+ public:
+  explicit RawCollectorHandler(std::string token, std::function<void(const struct mg_request_info *request_info)>& assertions) : MockSplunkHandler(std::move(token), assertions) {}
+ protected:
+  bool handlePostImpl(struct mg_connection* conn) override {
+    constexpr const char * body = "{\"text\":\"Success\",\"code\":0,\"ackId\":808}";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\n");
+    mg_printf(conn, "Content-length: %lu", strlen(body));
+    mg_printf(conn, "\r\n\r\n");
+    mg_printf(conn, body);
+    return true;
+  }
+};
+
+class AckIndexerHandler : public MockSplunkHandler {
+ public:
+  explicit AckIndexerHandler(std::string token, std::vector<uint64_t> indexed_events, std::function<void(const struct mg_request_info *request_info)>& assertions)
+      : MockSplunkHandler(std::move(token), assertions), indexed_events_(indexed_events) {}
+
+ protected:
+  bool handlePostImpl(struct mg_connection* conn) override {
+    std::vector<char> data;
+    data.reserve(2048);
+    mg_read(conn, data.data(), 2048);
+    rapidjson::Document post_data;
+
+    rapidjson::ParseResult parse_result = post_data.Parse<rapidjson::kParseStopWhenDoneFlag>(data.data());
+    if (parse_result.IsError())
+      return sendInvalidFormat(conn);
+    if (!post_data.HasMember("acks") || !post_data["acks"].IsArray())
+      return sendInvalidFormat(conn);
+    std::vector<uint64_t> ids;
+    for (auto& id : post_data["acks"].GetArray()) {
+      ids.push_back(id.GetUint64());
+    }
+    rapidjson::Document reply = rapidjson::Document(rapidjson::kObjectType);
+    reply.AddMember("acks", rapidjson::kObjectType, reply.GetAllocator());
+    for (auto& id : ids) {
+      rapidjson::Value key(std::to_string(id).c_str(), reply.GetAllocator());
+      reply["acks"].AddMember(key, std::find(indexed_events_.begin(), indexed_events_.end(), id) != indexed_events_.end() ? true : false, reply.GetAllocator());
+    }
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    reply.Accept(writer);
+
+    mg_printf(conn, "HTTP/1.1 200 OK\r\n");
+    mg_printf(conn, "Content-length: %lu", buffer.GetSize());
+    mg_printf(conn, "\r\n\r\n");
+    mg_printf(conn, "%s" , buffer.GetString());
+    return true;
+  }
+
+  bool sendInvalidFormat(struct mg_connection* conn) {
+    constexpr const char * body = "{\"text\":\"Invalid data format\",\"code\":6}";
+    mg_printf(conn, "HTTP/1.1 400 Bad Request\r\n");
+    mg_printf(conn, "Content-length: %lu", strlen(body));
+    mg_printf(conn, "\r\n\r\n");
+    mg_printf(conn, body);
+    return true;
+  }
+
+  std::vector<uint64_t> indexed_events_;
+};
+
+class MockSplunkHEC {
+  struct CivetLibrary{
+    CivetLibrary() {
+      if (getCounter()++ == 0) {
+        mg_init_library(0);
+      }
+    }
+    ~CivetLibrary() {
+      if (--getCounter() == 0) {
+        mg_exit_library();
+      }
+    }
+   private:
+    static std::atomic<int>& getCounter() {
+      static std::atomic<int> counter{0};
+      return counter;
+    }
+  };
+
+ public:
+  static constexpr const char* TOKEN = "Splunk 822f7d13-2b70-4f8c-848b-86edfc251222";
+
+  static inline std::vector<uint64_t> indexed_events = {0, 1};
+
+  explicit MockSplunkHEC(std::string port) : port_(std::move(port)) {
+    std::vector<std::string> options;
+    options.emplace_back("listening_ports");
+    options.emplace_back(port_);
+    server_.reset(new CivetServer(options, &callbacks_, &logger_));
+    {
+      MockSplunkHandler* raw_collector_handler = new RawCollectorHandler(TOKEN, assertions_);
+      server_->addHandler("/services/collector/raw", raw_collector_handler);
+      handlers_.emplace_back(std::move(raw_collector_handler));
+    }
+    {
+      MockSplunkHandler* ack_indexer_handler = new AckIndexerHandler(TOKEN, indexed_events, assertions_);
+      server_->addHandler("/services/collector/ack", ack_indexer_handler);
+      handlers_.emplace_back(std::move(ack_indexer_handler));
+    }
+  }
+
+  const std::string& getPort() const {
+    return port_;
+  }
+
+  void setAssertions(std::function<void(const struct mg_request_info *request_info)> assertions) {
+    assertions_ = assertions;
+  }
+
+
+ private:
+  CivetLibrary lib_;
+  std::string port_;
+  std::unique_ptr<CivetServer> server_;
+  std::vector<std::unique_ptr<MockSplunkHandler>> handlers_;
+  CivetCallbacks callbacks_;
+  std::function<void(const struct mg_request_info *request_info)> assertions_ = [](const struct mg_request_info*) {};
+  std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_ = org::apache::nifi::minifi::core::logging::LoggerFactory<MockSplunkHEC>::getLogger();
+};
diff --git a/extensions/splunk/tests/PutSplunkHTTPTests.cpp b/extensions/splunk/tests/PutSplunkHTTPTests.cpp
new file mode 100644
index 0000000..4409303
--- /dev/null
+++ b/extensions/splunk/tests/PutSplunkHTTPTests.cpp
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutSplunkHTTP.h"
+#include "SplunkAttributes.h"
+#include "TestBase.h"
+#include "ReadFromFlowFileTestProcessor.h"
+#include "WriteToFlowFileTestProcessor.h"
+#include "processors/UpdateAttribute.h"
+#include "MockSplunkHEC.h"
+
+using PutSplunkHTTP = org::apache::nifi::minifi::extensions::splunk::PutSplunkHTTP;
+using ReadFromFlowFileTestProcessor = org::apache::nifi::minifi::processors::ReadFromFlowFileTestProcessor;
+using WriteToFlowFileTestProcessor = org::apache::nifi::minifi::processors::WriteToFlowFileTestProcessor;
+using UpdateAttribute = org::apache::nifi::minifi::processors::UpdateAttribute;
+
+
+TEST_CASE("PutSplunkHTTP tests", "[putsplunkhttp]") {
+  MockSplunkHEC mock_splunk_hec("10133");
+
+  TestController test_controller;
+  auto plan = test_controller.createPlan();
+  auto write_to_flow_file = std::dynamic_pointer_cast<WriteToFlowFileTestProcessor>(plan->addProcessor("WriteToFlowFileTestProcessor", "write_to_flow_file"));
+  auto put_splunk_http = std::dynamic_pointer_cast<PutSplunkHTTP>(plan->addProcessor("PutSplunkHTTP", "put_splunk_http"));
+  auto read_from_success = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_success"));
+  auto read_from_failure = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_failure"));
+
+  plan->addConnection(write_to_flow_file, WriteToFlowFileTestProcessor::Success, put_splunk_http);
+  plan->addConnection(put_splunk_http, PutSplunkHTTP::Success, read_from_success);
+  plan->addConnection(put_splunk_http, PutSplunkHTTP::Failure, read_from_failure);
+
+  read_from_success->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+  read_from_failure->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+
+  plan->setProperty(put_splunk_http, PutSplunkHTTP::Hostname.getName(), "localhost");
+  plan->setProperty(put_splunk_http, PutSplunkHTTP::Port.getName(), mock_splunk_hec.getPort());
+  plan->setProperty(put_splunk_http, PutSplunkHTTP::Token.getName(), MockSplunkHEC::TOKEN);
+  plan->setProperty(put_splunk_http, PutSplunkHTTP::SplunkRequestChannel.getName(), "a12254b4-f481-435d-896d-3b6033eabe58");
+
+  write_to_flow_file->setContent("foobar");
+
+  SECTION("Happy path") {
+    mock_splunk_hec.setAssertions([](const struct mg_request_info *request_info) {
+      CHECK(request_info->query_string == nullptr);
+    });
+    test_controller.runSession(plan);
+    CHECK(read_from_failure->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_success->numberOfFlowFilesRead() == 1);
+    CHECK(read_from_success->readFlowFileWithContent("foobar"));
+    CHECK(read_from_success->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_STATUS_CODE, "200"));
+    CHECK(read_from_success->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_CODE, "0"));
+    CHECK(read_from_success->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_TIME));
+    CHECK(read_from_success->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID));
+  }
+
+  SECTION("Happy path with query arguments") {
+    plan->setProperty(put_splunk_http, PutSplunkHTTP::Source.getName(), "foo");
+    plan->setProperty(put_splunk_http, PutSplunkHTTP::SourceType.getName(), "bar");
+    plan->setProperty(put_splunk_http, PutSplunkHTTP::Host.getName(), "baz");
+    plan->setProperty(put_splunk_http, PutSplunkHTTP::Index.getName(), "qux");
+    mock_splunk_hec.setAssertions([](const struct mg_request_info *request_info) {
+      std::string query_string = request_info->query_string;
+      CHECK(!query_string.empty());
+      CHECK(query_string.find("source=foo") != std::string::npos);
+      CHECK(query_string.find("sourcetype=bar") != std::string::npos);
+      CHECK(query_string.find("host=baz") != std::string::npos);
+      CHECK(query_string.find("index=qux") != std::string::npos);
+    });
+    test_controller.runSession(plan);
+    CHECK(read_from_failure->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_success->numberOfFlowFilesRead() == 1);
+    CHECK(read_from_success->readFlowFileWithContent("foobar"));
+    CHECK(read_from_success->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_STATUS_CODE, "200"));
+    CHECK(read_from_success->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_CODE, "0"));
+    CHECK(read_from_success->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_TIME));
+    CHECK(read_from_success->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID));
+  }
+
+  SECTION("Invalid Token") {
+    constexpr const char* invalid_token = "Splunk 00000000-0000-0000-0000-000000000000";
+    plan->setProperty(put_splunk_http, PutSplunkHTTP::Token.getName(), invalid_token);
+    test_controller.runSession(plan);
+    CHECK(read_from_failure->numberOfFlowFilesRead() == 1);
+    CHECK(read_from_success->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_failure->readFlowFileWithContent("foobar"));
+    CHECK(read_from_failure->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_STATUS_CODE, "403"));
+    CHECK(read_from_failure->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_CODE, "4"));
+    CHECK(read_from_failure->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_TIME));
+    CHECK_FALSE(read_from_failure->readFlowFileWithAttribute(org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID));
+  }
+}
+
+namespace {
+struct ContentTypeValidator {
+  explicit ContentTypeValidator(std::string required_content_type) : required_content_type_(std::move(required_content_type)) {
+  }
+  void operator() (const struct mg_request_info* req_info) const {
+    if (!required_content_type_)
+      return;
+    auto content_type_header = std::find_if(std::begin(req_info->http_headers),
+                                            std::end(req_info->http_headers),
+                                            [](auto header) -> bool {return strcmp(header.name, "Content-Type") == 0;});
+    REQUIRE(content_type_header != std::end(req_info->http_headers));
+    CHECK(strcmp(content_type_header->value, required_content_type_.value().c_str()) == 0);
+  }
+  std::optional<std::string> required_content_type_;
+};
+}  // namespace
+
+TEST_CASE("PutSplunkHTTP content type tests", "[putsplunkhttpcontenttype]") {
+  MockSplunkHEC mock_splunk_hec("10131");
+
+  TestController test_controller;
+  auto plan = test_controller.createPlan();
+  auto write_to_flow_file = std::dynamic_pointer_cast<WriteToFlowFileTestProcessor>(plan->addProcessor("WriteToFlowFileTestProcessor", "write_to_flow_file"));
+  auto update_attribute = std::dynamic_pointer_cast<UpdateAttribute>(plan->addProcessor("UpdateAttribute", "update_attribute"));
+  auto put_splunk_http = std::dynamic_pointer_cast<PutSplunkHTTP>(plan->addProcessor("PutSplunkHTTP", "put_splunk_http"));
+
+  plan->addConnection(write_to_flow_file, WriteToFlowFileTestProcessor::Success, update_attribute);
+  plan->addConnection(update_attribute, UpdateAttribute::Success, put_splunk_http);
+  put_splunk_http->setAutoTerminatedRelationships({PutSplunkHTTP::Success, PutSplunkHTTP::Failure});
+
+  plan->setProperty(put_splunk_http, PutSplunkHTTP::Hostname.getName(), "localhost");
+  plan->setProperty(put_splunk_http, PutSplunkHTTP::Port.getName(), mock_splunk_hec.getPort());
+  plan->setProperty(put_splunk_http, PutSplunkHTTP::Token.getName(), MockSplunkHEC::TOKEN);
+  plan->setProperty(put_splunk_http, PutSplunkHTTP::SplunkRequestChannel.getName(), "a12254b4-f481-435d-896d-3b6033eabe58");
+
+  write_to_flow_file->setContent("foobar");
+
+  SECTION("Content Type without Property or Attribute") {
+    mock_splunk_hec.setAssertions(ContentTypeValidator("application/x-www-form-urlencoded"));
+    test_controller.runSession(plan);
+  }
+
+  SECTION("Content Type with Processor Property") {
+    plan->setProperty(put_splunk_http, PutSplunkHTTP::ContentType.getName(), "from_property");
+    mock_splunk_hec.setAssertions(ContentTypeValidator("from_property"));
+    test_controller.runSession(plan);
+  }
+
+  SECTION("Content Type with FlowFile Attribute") {
+    plan->setProperty(update_attribute, "mime.type", "from_attribute", true);
+    mock_splunk_hec.setAssertions(ContentTypeValidator("from_attribute"));
+    test_controller.runSession(plan);
+  }
+
+  SECTION("Content Type with Property and Attribute") {
+    plan->setProperty(update_attribute, "mime.type", "from_attribute", true);
+    plan->setProperty(put_splunk_http, PutSplunkHTTP::ContentType.getName(), "from_property");
+    mock_splunk_hec.setAssertions(ContentTypeValidator("from_property"));
+    test_controller.runSession(plan);
+  }
+}
+
diff --git a/extensions/splunk/tests/QuerySplunkIndexingStatusTests.cpp b/extensions/splunk/tests/QuerySplunkIndexingStatusTests.cpp
new file mode 100644
index 0000000..8fc2794
--- /dev/null
+++ b/extensions/splunk/tests/QuerySplunkIndexingStatusTests.cpp
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+
+#include "QuerySplunkIndexingStatus.h"
+#include "MockSplunkHEC.h"
+#include "SplunkAttributes.h"
+#include "TestBase.h"
+#include "processors/UpdateAttribute.h"
+#include "ReadFromFlowFileTestProcessor.h"
+#include "WriteToFlowFileTestProcessor.h"
+
+using QuerySplunkIndexingStatus = org::apache::nifi::minifi::extensions::splunk::QuerySplunkIndexingStatus;
+using ReadFromFlowFileTestProcessor = org::apache::nifi::minifi::processors::ReadFromFlowFileTestProcessor;
+using WriteToFlowFileTestProcessor = org::apache::nifi::minifi::processors::WriteToFlowFileTestProcessor;
+using UpdateAttribute = org::apache::nifi::minifi::processors::UpdateAttribute;
+using namespace std::literals::chrono_literals;
+
+TEST_CASE("QuerySplunkIndexingStatus tests", "[querysplunkindexingstatus]") {
+  MockSplunkHEC mock_splunk_hec("10132");
+
+  TestController test_controller;
+  auto plan = test_controller.createPlan();
+  auto write_to_flow_file = std::dynamic_pointer_cast<WriteToFlowFileTestProcessor>(plan->addProcessor("WriteToFlowFileTestProcessor", "write_to_flow_file"));
+  auto update_attribute = std::dynamic_pointer_cast<UpdateAttribute>(plan->addProcessor("UpdateAttribute", "update_attribute"));
+  auto query_splunk_indexing_status = std::dynamic_pointer_cast<QuerySplunkIndexingStatus>(plan->addProcessor("QuerySplunkIndexingStatus", "query_splunk_indexing_status"));
+  auto read_from_acknowledged = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_acknowledged"));
+  auto read_from_undetermined = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_undetermined"));
+  auto read_from_unacknowledged = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_unacknowledged"));
+  auto read_from_failure = std::dynamic_pointer_cast<ReadFromFlowFileTestProcessor>(plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_failure"));
+
+  plan->addConnection(write_to_flow_file, WriteToFlowFileTestProcessor::Success, update_attribute);
+  plan->addConnection(update_attribute, UpdateAttribute ::Success, query_splunk_indexing_status);
+  plan->addConnection(query_splunk_indexing_status, QuerySplunkIndexingStatus::Acknowledged, read_from_acknowledged);
+  plan->addConnection(query_splunk_indexing_status, QuerySplunkIndexingStatus::Undetermined, read_from_undetermined);
+  plan->addConnection(query_splunk_indexing_status, QuerySplunkIndexingStatus::Unacknowledged, read_from_unacknowledged);
+  plan->addConnection(query_splunk_indexing_status, QuerySplunkIndexingStatus::Failure, read_from_failure);
+
+  read_from_acknowledged->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+  read_from_undetermined->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+  read_from_unacknowledged->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+  read_from_failure->setAutoTerminatedRelationships({ReadFromFlowFileTestProcessor::Success});
+
+  plan->setProperty(query_splunk_indexing_status, QuerySplunkIndexingStatus::Hostname.getName(), "localhost");
+  plan->setProperty(query_splunk_indexing_status, QuerySplunkIndexingStatus::Port.getName(), mock_splunk_hec.getPort());
+  plan->setProperty(query_splunk_indexing_status, QuerySplunkIndexingStatus::Token.getName(), MockSplunkHEC::TOKEN);
+  plan->setProperty(query_splunk_indexing_status, QuerySplunkIndexingStatus::SplunkRequestChannel.getName(), "a12254b4-f481-435d-896d-3b6033eabe58");
+
+  auto response_timestamp = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count());
+  plan->setProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_TIME, response_timestamp, true);
+
+  write_to_flow_file->setContent("foobar");
+
+  SECTION("Querying indexed id") {
+    plan->setProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID, std::to_string(MockSplunkHEC::indexed_events[0]), true);
+    test_controller.runSession(plan);
+    CHECK(read_from_failure->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_undetermined->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_unacknowledged->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_acknowledged->numberOfFlowFilesRead() == 1);
+  }
+
+  SECTION("Querying not indexed id") {
+    plan->setProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID, "100", true);
+    query_splunk_indexing_status->setPenalizationPeriod(50ms);
+    test_controller.runSession(plan);
+    CHECK(read_from_failure->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_undetermined->numberOfFlowFilesRead() == 0);  // result penalized
+    CHECK(read_from_unacknowledged->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_acknowledged->numberOfFlowFilesRead() == 0);
+
+    write_to_flow_file->setContent("");
+    plan->reset();
+    std::this_thread::sleep_for(std::chrono::milliseconds(100ms));
+    test_controller.runSession(plan);
+
+    CHECK(read_from_failure->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_undetermined->numberOfFlowFilesRead() == 1);
+    CHECK(read_from_unacknowledged->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_acknowledged->numberOfFlowFilesRead() == 0);
+  }
+
+  SECTION("Querying not indexed old id") {
+    plan->setProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID, "100", true);
+    response_timestamp = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::system_clock::now() - 2h).time_since_epoch()).count());
+
+    plan->setProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_TIME, response_timestamp, true);
+    test_controller.runSession(plan);
+    CHECK(read_from_failure->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_undetermined->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_unacknowledged->numberOfFlowFilesRead() == 1);
+    CHECK(read_from_acknowledged->numberOfFlowFilesRead() == 0);
+  }
+
+  SECTION("Multiple inputs with same id") {
+    plan->setProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID, std::to_string(MockSplunkHEC::indexed_events[0]), true);
+    plan->setProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_RESPONSE_TIME, response_timestamp, true);
+    for (size_t i = 0; i < 4; ++i) {
+      plan->runProcessor(write_to_flow_file);
+      plan->runProcessor(update_attribute);
+    }
+    plan->runProcessor(query_splunk_indexing_status);
+    plan->runProcessor(read_from_failure);
+    plan->runProcessor(read_from_undetermined);
+    plan->runProcessor(read_from_unacknowledged);
+    plan->runProcessor(read_from_acknowledged);
+    CHECK(read_from_failure->numberOfFlowFilesRead() == 4);
+    CHECK(read_from_undetermined->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_unacknowledged->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_acknowledged->numberOfFlowFilesRead() == 0);
+  }
+
+  SECTION("MaxQuerySize can limit the number of queries") {
+    plan->setProperty(query_splunk_indexing_status, QuerySplunkIndexingStatus::MaxQuerySize.getName(), "5");
+    for (size_t i = 0; i < 10; ++i) {
+      plan->runProcessor(write_to_flow_file);
+      plan->runProcessor(update_attribute);
+    }
+    plan->runProcessor(query_splunk_indexing_status);
+    CHECK(plan->getNumFlowFileProducedByProcessor(query_splunk_indexing_status) == 5);
+  }
+
+  SECTION("Input flow file has no attributes") {
+    test_controller.runSession(plan);
+    CHECK(read_from_failure->numberOfFlowFilesRead() == 1);
+    CHECK(read_from_undetermined->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_unacknowledged->numberOfFlowFilesRead() == 0);
+    CHECK(read_from_acknowledged->numberOfFlowFilesRead() == 0);
+  }
+
+  SECTION("Invalid index") {
+    plan->setProperty(update_attribute, org::apache::nifi::minifi::extensions::splunk::SPLUNK_ACK_ID, "foo", true);
+    REQUIRE_THROWS(test_controller.runSession(plan));
+  }
+}
+
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 64f871a..1a8bc4b 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#define EXTENSION_LIST "*minifi-*,!*http-curl*,!*coap*"
+#define EXTENSION_LIST "*minifi-*,!*http-curl*,!*coap*,!*splunk*"
 
 #include <cstdio>
 #include <utility>
diff --git a/libminifi/test/ReadFromFlowFileTestProcessor.cpp b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
index 7f41fc1..c33bdb2 100644
--- a/libminifi/test/ReadFromFlowFileTestProcessor.cpp
+++ b/libminifi/test/ReadFromFlowFileTestProcessor.cpp
@@ -48,13 +48,13 @@ struct ReadFlowFileIntoBuffer : public InputStreamCallback {
 void ReadFromFlowFileTestProcessor::onTrigger(core::ProcessContext* context, core::ProcessSession* session) {
   gsl_Expects(context && session);
   logger_->log_info("%s", ON_TRIGGER_LOG_STR);
-  flow_file_contents_.clear();
+  flow_files_read_.clear();
 
   while (std::shared_ptr<core::FlowFile> flow_file = session->get()) {
     ReadFlowFileIntoBuffer callback;
     session->read(flow_file, &callback);
-    flow_file_contents_.push_back(std::string(callback.buffer_.begin(), callback.buffer_.end()));
     session->transfer(flow_file, Success);
+    flow_files_read_.emplace_back(session, gsl::not_null(std::move(flow_file)));
   }
 }
 
@@ -62,6 +62,30 @@ void ReadFromFlowFileTestProcessor::onUnSchedule() {
   logger_->log_info("%s", ON_UNSCHEDULE_LOG_STR);
 }
 
+ReadFromFlowFileTestProcessor::FlowFileData::FlowFileData(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file) {
+  ReadFlowFileIntoBuffer callback;
+  session->read(flow_file, &callback);
+  content_ = std::string(callback.buffer_.begin(), callback.buffer_.end());
+  attributes_ = flow_file->getAttributes();
+}
+
+bool ReadFromFlowFileTestProcessor::readFlowFileWithContent(const std::string& content) const {
+  return std::find_if(flow_files_read_.begin(), flow_files_read_.end(), [&content](FlowFileData flow_file_data){ return flow_file_data.content_ == content; }) != flow_files_read_.end();
+}
+
+bool ReadFromFlowFileTestProcessor::readFlowFileWithAttribute(const std::string& key) const {
+  return std::find_if(flow_files_read_.begin(),
+                      flow_files_read_.end(),
+                      [&key](FlowFileData flow_file_data) { return flow_file_data.attributes_.contains(key); }) != flow_files_read_.end();
+}
+
+bool ReadFromFlowFileTestProcessor::readFlowFileWithAttribute(const std::string& key, const std::string& value) const {
+  return std::find_if(flow_files_read_.begin(),
+                      flow_files_read_.end(),
+                      [&key, &value](FlowFileData flow_file_data) { return flow_file_data.attributes_.contains(key) && flow_file_data.attributes_.at(key) == value; }) != flow_files_read_.end();
+}
+
+
 REGISTER_RESOURCE(ReadFromFlowFileTestProcessor, "ReadFromFlowFileTestProcessor (only for testing purposes)");
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/test/ReadFromFlowFileTestProcessor.h b/libminifi/test/ReadFromFlowFileTestProcessor.h
index 455c7e7..790478e 100644
--- a/libminifi/test/ReadFromFlowFileTestProcessor.h
+++ b/libminifi/test/ReadFromFlowFileTestProcessor.h
@@ -18,6 +18,7 @@
 #include <string>
 #include <memory>
 #include <vector>
+#include <map>
 
 #include "core/Processor.h"
 #include "core/ProcessSession.h"
@@ -49,17 +50,22 @@ class ReadFromFlowFileTestProcessor : public core::Processor {
   void initialize() override;
   void onUnSchedule() override;
 
-  bool readFlowFileWithContent(const std::string& content) const {
-    return std::find(flow_file_contents_.begin(), flow_file_contents_.end(), content) != flow_file_contents_.end();
-  }
+  bool readFlowFileWithContent(const std::string& content) const;
+  bool readFlowFileWithAttribute(const std::string& key) const;
+  bool readFlowFileWithAttribute(const std::string& key, const std::string& value) const;
 
   size_t numberOfFlowFilesRead() const {
-    return flow_file_contents_.size();
+    return flow_files_read_.size();
   }
 
  private:
+  struct FlowFileData {
+    FlowFileData(core::ProcessSession* session, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file);
+    std::string content_;
+    std::map<std::string, std::string> attributes_;
+  };
   std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ReadFromFlowFileTestProcessor>::getLogger();
-  std::vector<std::string> flow_file_contents_;
+  std::vector<FlowFileData> flow_files_read_;
 };
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp
index 324cb37..941e508 100644
--- a/libminifi/test/TestBase.cpp
+++ b/libminifi/test/TestBase.cpp
@@ -491,6 +491,10 @@ bool TestPlan::runCurrentProcessorUntilFlowfileIsProduced(const std::chrono::sec
 
 std::size_t TestPlan::getNumFlowFileProducedByCurrentProcessor() {
   const auto& processor = processor_queue_.at(gsl::narrow<size_t>(location));
+  return getNumFlowFileProducedByProcessor(processor);
+}
+
+std::size_t TestPlan::getNumFlowFileProducedByProcessor(const std::shared_ptr<minifi::core::Processor>& processor) {
   std::vector<minifi::Connection*> connections = getProcessorOutboundConnections(processor);
   std::size_t num_flow_files = 0;
   for (auto connection : connections) {
diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h
index 10833b0..13bf4c8 100644
--- a/libminifi/test/TestBase.h
+++ b/libminifi/test/TestBase.h
@@ -218,6 +218,7 @@ class TestPlan {
 
   std::shared_ptr<minifi::core::FlowFile> getCurrentFlowFile();
   std::vector<minifi::Connection*> getProcessorOutboundConnections(const std::shared_ptr<minifi::core::Processor>& processor);
+  std::size_t getNumFlowFileProducedByProcessor(const std::shared_ptr<minifi::core::Processor>& processor);
   std::size_t getNumFlowFileProducedByCurrentProcessor();
   std::shared_ptr<minifi::core::FlowFile> getFlowFileProducedByCurrentProcessor();
 
diff --git a/win_build_vs.bat b/win_build_vs.bat
index 91991f6..f0fd8df 100755
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -57,6 +57,7 @@ for %%x in (%*) do (
     if [%%~x] EQU [/A]           set build_AWS=ON
     if [%%~x] EQU [/SFTP]        set build_SFTP=ON
     if [%%~x] EQU [/PDH]         set build_PDH=ON
+    if [%%~x] EQU [/SPLUNK]      set build_SPLUNK=ON
     if [%%~x] EQU [/M]           set installer_merge_modules=ON
     if [%%~x] EQU [/Z]           set build_azure=ON
     if [%%~x] EQU [/N]           set build_nanofi=ON
@@ -73,7 +74,7 @@ for %%x in (%*) do (
 mkdir %builddir%
 pushd %builddir%\
 
-cmake -G %generator% -A %build_platform% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL% -DUSE_REAL_ODBC_TEST_DRIVER=%real_odbc% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% -DENABLE_PDH=%build_PDH% -DENABLE_AZURE=%build_azure% -D [...]
+cmake -G %generator% -A %build_platform% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL% -DUSE_REAL_ODBC_TEST_DRIVER=%real_odbc% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% -DENABLE_PDH=%build_PDH% -DENABLE_AZURE=%build_azure% -D [...]
 IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%
 if [%cpack%] EQU [ON] (
     cpack -C %cmake_build_type%