You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2022/06/28 16:33:16 UTC

[nifi-minifi-cpp] branch main updated: MINIFICPP-1843 Implement PostElasticsearch

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f2d977ec7 MINIFICPP-1843 Implement PostElasticsearch
f2d977ec7 is described below

commit f2d977ec7249757c93dbfdda10a3fc5fee25399c
Author: Martin Zink <ma...@apache.org>
AuthorDate: Tue Jun 28 18:22:04 2022 +0200

    MINIFICPP-1843 Implement PostElasticsearch
    
    Closes #1349
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 .github/workflows/ci.yml                           |   7 +-
 CMakeLists.txt                                     |   1 +
 CONTROLLERS.md                                     |  58 ++--
 PROCESSORS.md                                      |  31 +-
 README.md                                          |  57 ++--
 bootstrap.sh                                       |   4 +
 bstrp_functions.sh                                 |   6 +-
 cmake/DockerConfig.cmake                           |   5 +
 docker/DockerVerify.sh                             |   2 +-
 docker/Dockerfile                                  |   4 +-
 docker/bionic/Dockerfile                           |   4 +-
 docker/centos/Dockerfile                           |   4 +-
 docker/fedora/Dockerfile                           |   4 +-
 docker/focal/Dockerfile                            |   4 +-
 docker/requirements.txt                            |   2 +-
 .../integration/MiNiFi_integration_test_driver.py  |  25 ++
 .../integration/features/elasticsearch.feature     |  80 ++++++
 .../test/integration/features/opensearch.feature   |  83 ++++++
 .../controllers/ElasticsearchCredentialsService.py |  13 +
 .../integration/minifi/core/DockerTestCluster.py   |  27 ++
 .../minifi/core/DockerTestDirectoryBindings.py     |   2 +
 .../minifi/core/ElasticsearchContainer.py          |  41 +++
 docker/test/integration/minifi/core/ImageStore.py  |  10 +
 .../integration/minifi/core/OpensearchContainer.py |  38 +++
 .../minifi/core/SingleNodeDockerCluster.py         |   6 +
 .../minifi/processors/PostElasticsearch.py         |  30 ++
 .../integration/resources/elasticsearch/Dockerfile |   7 +
 .../resources/elasticsearch/certs/elastic_http.crt |  19 ++
 .../resources/elasticsearch/certs/elastic_http.key |  28 ++
 .../elasticsearch/certs/elastic_transport.crt      |  19 ++
 .../elasticsearch/certs/elastic_transport.key      |  28 ++
 .../elasticsearch/certs/minifi_client.crt          |  19 ++
 .../elasticsearch/certs/minifi_client.key          |  28 ++
 .../resources/elasticsearch/certs/root_ca.crt      |  19 ++
 .../resources/elasticsearch/elasticsearch.yml      |  28 ++
 .../integration/resources/opensearch/Dockerfile    |   5 +
 .../resources/opensearch/certs/admin-key.pem       |  28 ++
 .../resources/opensearch/certs/admin.pem           |  19 ++
 .../resources/opensearch/certs/root-ca.pem         |  21 ++
 .../resources/opensearch/opensearch.yml            |  16 ++
 docker/test/integration/steps/steps.py             |  77 +++++
 extensions/elasticsearch/CMakeLists.txt            |  36 +++
 .../ElasticsearchCredentialsControllerService.cpp  |  65 +++++
 .../ElasticsearchCredentialsControllerService.h    |  73 +++++
 extensions/elasticsearch/PostElasticsearch.cpp     | 319 +++++++++++++++++++++
 extensions/elasticsearch/PostElasticsearch.h       |  90 ++++++
 extensions/elasticsearch/tests/CMakeLists.txt      |  41 +++
 extensions/elasticsearch/tests/MockElastic.h       | 175 +++++++++++
 .../elasticsearch/tests/PostElasticsearchTests.cpp | 117 ++++++++
 extensions/http-curl/client/HTTPClient.cpp         |  23 +-
 extensions/http-curl/client/HTTPClient.h           |  15 +-
 extensions/http-curl/tests/CivetLibrary.h          |  40 +++
 extensions/http-curl/tests/TestServer.h            |  19 +-
 extensions/splunk/tests/MockSplunkHEC.h            |  19 +-
 .../tests/unit/ProcessorTests.cpp                  |   2 +-
 libminifi/include/utils/HTTPClient.h               |   3 +
 libminifi/include/utils/JsonCallback.h             |  20 ++
 libminifi/test/SingleProcessorTestController.h     |  19 +-
 win_build_vs.bat                                   |   3 +-
 59 files changed, 1882 insertions(+), 106 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c2bcab3ab..bdc8073b9 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -81,7 +81,7 @@ jobs:
         run: |
           PATH %PATH%;C:\Program Files (x86)\Windows Kits\10\bin\10.0.19041.0\x64
           PATH %PATH%;C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\MSBuild\Current\Bin\Roslyn
-          win_build_vs.bat ..\b /64 /CI /S /A /PDH /SPLUNK /GCP /K /L /R /Z /N /RO /PR
+          win_build_vs.bat ..\b /64 /CI /S /A /PDH /SPLUNK /GCP /ELASTIC /K /L /R /Z /N /RO /PR
         shell: cmd
       - name: test
         run: cd ..\b && ctest --timeout 300 --parallel %NUMBER_OF_PROCESSORS% -C Release --output-on-failure
@@ -119,7 +119,7 @@ jobs:
           cmake -DUSE_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=Release -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON -DENABLE_AWS=ON -DENABLE_AZURE=ON -DENABLE_BUSTACHE=ON -DENABLE_COAP=ON \
               -DENABLE_ENCRYPT_CONFIG=ON -DENABLE_GPS=ON -DENABLE_JNI=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_LINTER=ON -DENABLE_MQTT=ON -DENABLE_NANOFI=ON -DENABLE_OPC=ON -DENABLE_OPENCV=ON \
               -DENABLE_OPENWSMAN=ON -DENABLE_OPS=ON -DENABLE_PCAP=ON -DENABLE_PYTHON=ON -DENABLE_SENSORS=ON -DENABLE_SFTP=ON -DENABLE_SQL=ON -DENABLE_SYSTEMD=ON -DENABLE_TENSORFLOW=OFF \
-              -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON -DENABLE_PROCFS=ON -DENABLE_PROMETHEUS=ON ..
+              -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON -DENABLE_PROCFS=ON -DENABLE_PROMETHEUS=ON -DENABLE_ELASTICSEARCH=ON ..
           make -j$(nproc) VERBOSE=1
       - name: test
         run: cd build && make test ARGS="--timeout 300 -j2 --output-on-failure"
@@ -166,7 +166,8 @@ jobs:
           cmake -DUSE_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=Release -DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON -DENABLE_AWS=ON -DENABLE_AZURE=ON -DENABLE_BUSTACHE=ON -DENABLE_COAP=ON \
               -DENABLE_ENCRYPT_CONFIG=ON -DENABLE_GPS=ON -DENABLE_JNI=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_LINTER=ON -DENABLE_MQTT=ON -DENABLE_NANOFI=ON -DENABLE_OPC=ON -DENABLE_OPENCV=ON \
               -DENABLE_OPENWSMAN=ON -DENABLE_OPS=ON -DENABLE_PCAP=ON -DENABLE_PYTHON=ON -DENABLE_SENSORS=ON -DENABLE_SFTP=ON -DENABLE_SQL=ON -DENABLE_SYSTEMD=ON -DENABLE_TENSORFLOW=OFF \
-              -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON -DENABLE_PROCFS=ON -DENABLE_PROMETHEUS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON ..
+              -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON -DENABLE_PROCFS=ON -DENABLE_PROMETHEUS=ON -DENABLE_ELASTICSEARCH=ON \
+              -DCMAKE_EXPORT_COMPILE_COMMANDS=ON ..
           cmake --build . --parallel $(nproc)
       - name: test
         run: cd build && make test ARGS="--timeout 300 -j8 --output-on-failure"
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ce5bc75fd..3457f5f34 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -116,6 +116,7 @@ option(ENABLE_OPENWSMAN "Enables the Openwsman extensions." OFF)
 option(ENABLE_AZURE "Enables Azure support." ON)
 option(ENABLE_ENCRYPT_CONFIG "Enables build of encrypt-config binary." ON)
 option(ENABLE_SPLUNK "Enable Splunk support" ON)
+option(ENABLE_ELASTICSEARCH "Enable Elasticsearch support" OFF)
 option(ENABLE_GCP "Enable Google Cloud support" ON)
 option(DOCKER_BUILD_ONLY "Disables all targets except docker build scripts. Ideal for systems without an up-to-date compiler." OFF)
 option(ENABLE_KUBERNETES "Enables the Kubernetes extensions." OFF)
diff --git a/CONTROLLERS.md b/CONTROLLERS.md
index 84aa57eea..0939a8846 100644
--- a/CONTROLLERS.md
+++ b/CONTROLLERS.md
@@ -20,6 +20,7 @@
 - [AWSCredentialsService](#AWSCredentialsService)
 - [AzureStorageCredentialsService](#AzureStorageCredentialsService)
 - [GCPCredentialsControllerService](#GCPCredentialsControllerService)
+- [ElasticsearchCredentialsControllerService](#ElasticsearchCredentialsControllerService)
 - [KubernetesControllerService](#kubernetesControllerService)
 
 ## AWSCredentialsService
@@ -35,12 +36,12 @@ controller service so that AWS credentials can be managed and controlled in a ce
 In the list below, the names of required properties appear in bold. Any other
 properties (not in bold) are considered optional.
 
-| Name | Default Value | Allowable Values | Description |
-| - | - | - | - |
-|**Use Default Credentials**|false||If true, uses the Default Credential chain, including EC2 instance profiles or roles, environment variables, default user credentials, etc.|
-|Access Key|||Specifies the AWS Access Key|
-|Secret Key|||Specifies the AWS Secret Key|
-|Credentials File|||Path to a file containing AWS access key and secret key in properties file format. Properties used: accessKey and secretKey|
+| Name                        | Default Value | Allowable Values | Description                                                                                                                                 |
+|-----------------------------|---------------|------------------|---------------------------------------------------------------------------------------------------------------------------------------------|
+| **Use Default Credentials** | false         |                  | If true, uses the Default Credential chain, including EC2 instance profiles or roles, environment variables, default user credentials, etc. |
+| Access Key                  |               |                  | Specifies the AWS Access Key                                                                                                                |
+| Secret Key                  |               |                  | Specifies the AWS Secret Key                                                                                                                |
+| Credentials File            |               |                  | Path to a file containing AWS access key and secret key in properties file format. Properties used: accessKey and secretKey                 |
 
 
 ## AzureStorageCredentialsService
@@ -55,14 +56,14 @@ controller service so that Azure storage credentials can be managed and controll
 In the list below, the names of required properties appear in bold. Any other
 properties (not in bold) are considered optional.
 
-| Name | Default Value | Allowable Values | Description |
-| - | - | - | - |
-|Storage Account Name|||The storage account name.|
-|Storage Account Key|||The storage account key. This is an admin-like password providing access to every container in this account. It is recommended one uses Shared Access Signature (SAS) token instead for fine-grained control with policies.|
-|SAS Token|||Shared Access Signature token. Specify either SAS Token (recommended) or Storage Account Key together with Storage Account Name if Managed Identity is not used.|
-|Common Storage Account Endpoint Suffix|||Storage accounts in public Azure always use a common FQDN suffix. Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).|
-|Connection String|||Connection string used to connect to Azure Storage service. This overrides all other set credential properties if Managed Identity is not used.|
-|**Use Managed Identity Credentials**|false||Connection string used to connect to Azure Storage service. This overrides all other set credential properties.|
+| Name                                   | Default Value | Allowable Values | Description                                                                                                                                                                                                                 |
+|----------------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Storage Account Name                   |               |                  | The storage account name.                                                                                                                                                                                                   |
+| Storage Account Key                    |               |                  | The storage account key. This is an admin-like password providing access to every container in this account. It is recommended one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. |
+| SAS Token                              |               |                  | Shared Access Signature token. Specify either SAS Token (recommended) or Storage Account Key together with Storage Account Name if Managed Identity is not used.                                                            |
+| Common Storage Account Endpoint Suffix |               |                  | Storage accounts in public Azure always use a common FQDN suffix. Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).                            |
+| Connection String                      |               |                  | Connection string used to connect to Azure Storage service. This overrides all other set credential properties if Managed Identity is not used.                                                                             |
+| **Use Managed Identity Credentials**   | false         |                  | Connection string used to connect to Azure Storage service. This overrides all other set credential properties.                                                                                                             |
 
 ## GCPCredentialsControllerService
 
@@ -84,6 +85,25 @@ properties (not in bold) are considered optional.
 | Service Account JSON      |                                        |                                                                                                                                                             | The raw JSON containing a Service Account keyfile.                   |
 
 
+## ElasticsearchCredentialsControllerService
+
+### Description
+
+Elasticsearch Credentials Controller Service
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other
+properties (not in bold) are considered optional.
+
+
+| Name                   | Default Value | Allowable Values                 | Description                           |
+|------------------------|---------------|----------------------------------|---------------------------------------|
+| Username               |               |                                  | The username for basic authentication |
+| Password               |               |                                  | The password for basic authentication |
+| API Key                |               |                                  | The API Key to use                    |
+
+
 ## KubernetesControllerService
 
 ### Description
@@ -94,8 +114,8 @@ Controller service that provides access to the Kubernetes API.
 
 In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional.
 
-| Name | Default Value | Allowable Values | Description |
-| - | - | - | - |
-|Namespace Filter|default||Limit the output to pods in namespaces which match this regular expression|
-|Pod Name Filter|||If present, limit the output to pods the name of which matches this regular expression|
-|Container Name Filter|||If present, limit the output to containers the name of which matches this regular expression|
+| Name                  | Default Value | Allowable Values | Description                                                                                  |
+|-----------------------|---------------|------------------|----------------------------------------------------------------------------------------------|
+| Namespace Filter      | default       |                  | Limit the output to pods in namespaces which match this regular expression                   |
+| Pod Name Filter       |               |                  | If present, limit the output to pods the name of which matches this regular expression       |
+| Container Name Filter |               |                  | If present, limit the output to containers the name of which matches this regular expression |
diff --git a/PROCESSORS.md b/PROCESSORS.md
index e71eb88a4..71a7221c4 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -54,6 +54,7 @@
 - [MergeContent](#mergecontent)
 - [MotionDetector](#motiondetector)
 - [PerformanceDataMonitor](#performancedatamonitor)
+- [PostElasticsearch](#postelasticsearch)
 - [ProcFsMonitor](#procfsmonitor)
 - [PublishKafka](#publishkafka)
 - [PublishMQTT](#publishmqtt)
@@ -1659,6 +1660,35 @@ In the list below, the names of required properties appear in bold. Any other pr
 | success | All files are routed to success |
 
 
+## PostElasticsearch
+
+### Description
+
+An Elasticsearch/Opensearch post processor that uses the Elasticsearch/Opensearch _bulk REST API.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                                           | Default Value | Allowable Values | Description                                                                                                                                                                                                                                                                                               |
+|------------------------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Action**                                     |               |                  | The type of the operation used to index (create, delete, index, update, upsert)<br/>**Supports Expression Language: true**                                                                                                                                                                                |
+| Max Batch Size                                 | 100           |                  | The maximum number of flow files to process at a time.                                                                                                                                                                                                                                                    |
+| **Elasticsearch Credentials Provider Service** |               |                  | The Controller Service used to obtain Elasticsearch credentials.                                                                                                                                                                                                                                          |
+| SSL Context Service                            |               |                  | The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.                                                                                                                                                                                                   |
+| **Hosts**                                      |               |                  | A comma-separated list of HTTP hosts that host Elasticsearch query nodes. Currently only supports a single host.<br/>**Supports Expression Language: true**                                                                                                                                               |
+| **Index**                                      |               |                  | The name of the index to use.<br/>**Supports Expression Language: true**                                                                                                                                                                                                                                  |
+| Identifier                                     |               |                  | If the Action is "index" or "create", this property may be left empty or evaluate to an empty value, in which case the document's identifier will be auto-generated by Elasticsearch. For all other Actions, the attribute must evaluate to a non-empty value.<br/>**Supports Expression Language: true** |
+### Properties
+
+| Name    | Description                                                                                   |
+|---------|-----------------------------------------------------------------------------------------------|
+| success | All flowfiles that succeed in being transferred into Elasticsearch go here.                   |
+| failure | All flowfiles that fail for reasons unrelated to server availability go to this relationship. |
+| error   | All flowfiles that Elasticsearch responded to with an error go to this relationship.          |
+
+
+
+
 ## ProcFsMonitor
 
 ### Description
@@ -1874,7 +1904,6 @@ In the list below, the names of required properties appear in bold. Any other pr
 | _gcs.encryption.algorithm_ | success      | The algorithm used to encrypt the object.                          |
 | _gcs.encryption.sha256_    | success      | The SHA256 hash of the key used to encrypt the object              |
 
-
 ## PutFile
 
 ### Description
diff --git a/README.md b/README.md
index a19b26144..0f6a5ab61 100644
--- a/README.md
+++ b/README.md
@@ -72,34 +72,35 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte
 Through JNI extensions you can run NiFi processors using NARs. The JNI extension set allows you to run these Java processors. MiNiFi C++ will favor C++ implementations over Java implements. In the case where a processor is implemented in either language, the one in C++ will be selected; however, will remain transparent to the consumer.
 
 
-| Extension Set                    | Processors and Controller Services                                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
-|----------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
-| Archive Extensions               | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry)                                                                                                                                         [...]
-| AWS                              | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3)                                                                                                                                                                                                                       [...]
-| Azure                            | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[DeleteAzureBlobStorage](#deleteazureblobstorage)<br/>[FetchAzureBlobStorage](#fetchazureblobstorage)<br/>[ListAzureBlobStorage](#listazureblobstorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchaz [...]
-| CivetWeb                         | [ListenHTTP](PROCESSORS.md#listenhttp)                                                                                                                                                                                                                                                                                                                                                                                                                                  [...]
-| CURL                             | [InvokeHTTP](PROCESSORS.md#invokehttp)                                                                                                                                                                                                                                                                                                                                                                                                                                  [...]
-| GPS                              | [GetGPS](PROCESSORS.md#getgps)                                                                                                                                                                                                                                                                                                                                                                                                                                          [...]
-| Google Cloud Platform            | [DeleteGCSObject](PROCESSORS.md#deletegcsobject)<br>[FetchGCSObject](PROCESSORS.md#fetchgcsobject)<br>[GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService)<br>[ListGCSBucket](PROCESSORS.md#listgcsbucket)<br>[PutGCSObject](PROCESSORS.md#putgcsobject)                                                                                                                                                                                   [...]
-| Kafka                            | [PublishKafka](PROCESSORS.md#publishkafka)                                                                                                                                                                                                                                                                                                                                                                                                                              [...]
-| Kubernetes                       | [KubernetesControllerService](CONTROLLERS.md#kubernetesControllerService)                                                                                                                                                                                                                                                                                                                                                                                               [...]
-| JNI                              | **NiFi Processors**                                                                                                                                                                                                                                                                                                                                                                                                                                                     [...]
-| MQTT                             | [ConsumeMQTT](PROCESSORS.md#consumeMQTT)<br/>[PublishMQTT](PROCESSORS.md#publishMQTT)                                                                                                                                                                                                                                                                                                                                                                                   [...]
-| OPC                              | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)                                                                                                                                                                                                                                                                                                                                                                                                                    [...]
-| OpenCV                           | [CaptureRTSPFrame](PROCESSORS.md#captureRTSPFrame)                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
-| OpenWSMAN                        | SourceInitiatedSubscriptionListener                                                                                                                                                                                                                                                                                                                                                                                                                                     [...]
-| PCAP                             | [CapturePacket](PROCESSORS.md#capturepacket)                                                                                                                                                                                                                                                                                                                                                                                                                            [...]
-| PDH (Windows only)               | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor)                                                                                                                                                                                                                                                                                                                                                                                                          [...]
-| ProcFs (Linux only)              | [ProcFsMonitor](PROCESSORS.md#procfsmonitor)                                                                                                                                                                                                                                                                                                                                                                                                                            [...]
-| Scripting                        | [ExecuteScript](PROCESSORS.md#executescript)<br/>**Custom Python Processors**                                                                                                                                                                                                                                                                                                                                                                                           [...]
-| Sensors                          | GetEnvironmentalSensors<br/>GetMovementSensors                                                                                                                                                                                                                                                                                                                                                                                                                          [...]
-| SFTP                             | [FetchSFTP](PROCESSORS.md#fetchsftp)<br/>[ListSFTP](PROCESSORS.md#listsftp)<br/>[PutSFTP](PROCESSORS.md#putsftp)                                                                                                                                                                                                                                                                                                                                                        [...]
-| SQL                              | [ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)<br/>[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)<br/>                                                                                                                                                                                                                                                                                                                               [...]
-| Splunk                           | [PutSplunkHTTP](PROCESSORS.md#putsplunkhttp)<br/>[QuerySplunkIndexingStatus](PROCESSORS.md#querysplunkindexingstatus)                                                                                                                                                                                                                                                                                                                                                   [...]
-| Systemd                          | [ConsumeJournald](PROCESSORS.md#consumejournald)                                                                                                                                                                                                                                                                                                                                                                                                                        [...]
-| Tensorflow                       | TFApplyGraph<br/>TFConvertImageToTensor<br/>TFExtractTopLabels<br/>                                                                                                                                                                                                                                                                                                                                                                                                     [...]
-| USB Camera                       | [GetUSBCamera](PROCESSORS.md#getusbcamera)                                                                                                                                                                                                                                                                                                                                                                                                                              [...]
-| Windows Event Log (Windows only) | [CollectorInitiatedSubscription](PROCESSORS.md#collectorinitiatedsubscription)<br/>[ConsumeWindowsEventLog](PROCESSORS.md#consumewindowseventlog)<br/>[TailEventLog](PROCESSORS.md#taileventlog)                                                                                                                                                                                                                                                                        [...]
+| Extension Set                    | Processors and Controller Services                                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
+|----------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| Archive Extensions               | [ApplyTemplate](PROCESSORS.md#applytemplate)<br/>[CompressContent](PROCESSORS.md#compresscontent)<br/>[ManipulateArchive](PROCESSORS.md#manipulatearchive)<br/>[MergeContent](PROCESSORS.md#mergecontent)<br/>[FocusArchiveEntry](PROCESSORS.md#focusarchiveentry)<br/>[UnfocusArchiveEntry](PROCESSORS.md#unfocusarchiveentry)                                                                                                                                         [...]
+| AWS                              | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)<br/>[PutS3Object](PROCESSORS.md#puts3object)<br/>[DeleteS3Object](PROCESSORS.md#deletes3object)<br/>[FetchS3Object](PROCESSORS.md#fetchs3object)<br/>[ListS3](PROCESSORS.md#lists3)                                                                                                                                                                                                                       [...]
+| Azure                            | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)<br/>[PutAzureBlobStorage](PROCESSORS.md#putazureblobatorage)<br/>[DeleteAzureBlobStorage](#deleteazureblobstorage)<br/>[FetchAzureBlobStorage](#fetchazureblobstorage)<br/>[ListAzureBlobStorage](#listazureblobstorage)<br/>[PutAzureDataLakeStorage](#putazuredatalakestorage)<br/>[DeleteAzureDataLakeStorage](#deleteazuredatalakestorage)<br/>[FetchAzureDataLakeStorage](#fetchaz [...]
+| CivetWeb                         | [ListenHTTP](PROCESSORS.md#listenhttp)                                                                                                                                                                                                                                                                                                                                                                                                                                  [...]
+| CURL                             | [InvokeHTTP](PROCESSORS.md#invokehttp)                                                                                                                                                                                                                                                                                                                                                                                                                                  [...]
+| Elasticsearch                    | [ElasticsearchCredentialsControllerService](CONTROLLERS.md#elasticsearchcredentialscontrollerservice)<br/>[PostElasticsearch](PROCESSORS.md#postelasticsearch)                                                                                                                                                                                                                                                                                                          [...]
+| GPS                              | [GetGPS](PROCESSORS.md#getgps)                                                                                                                                                                                                                                                                                                                                                                                                                                          [...]
+| Google Cloud Platform            | [DeleteGCSObject](PROCESSORS.md#deletegcsobject)<br>[FetchGCSObject](PROCESSORS.md#fetchgcsobject)<br>[GCPCredentialsControllerService](CONTROLLERS.md#GCPCredentialsControllerService)<br>[ListGCSBucket](PROCESSORS.md#listgcsbucket)<br>[PutGCSObject](PROCESSORS.md#putgcsobject)                                                                                                                                                                                   [...]
+| Kafka                            | [PublishKafka](PROCESSORS.md#publishkafka)                                                                                                                                                                                                                                                                                                                                                                                                                              [...]
+| Kubernetes                       | [KubernetesControllerService](CONTROLLERS.md#kubernetesControllerService)                                                                                                                                                                                                                                                                                                                                                                                               [...]
+| JNI                              | **NiFi Processors**                                                                                                                                                                                                                                                                                                                                                                                                                                                     [...]
+| MQTT                             | [ConsumeMQTT](PROCESSORS.md#consumeMQTT)<br/>[PublishMQTT](PROCESSORS.md#publishMQTT)                                                                                                                                                                                                                                                                                                                                                                                   [...]
+| OPC                              | [FetchOPCProcessor](PROCESSORS.md#fetchopcprocessor)                                                                                                                                                                                                                                                                                                                                                                                                                    [...]
+| OpenCV                           | [CaptureRTSPFrame](PROCESSORS.md#captureRTSPFrame)                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
+| OpenWSMAN                        | SourceInitiatedSubscriptionListener                                                                                                                                                                                                                                                                                                                                                                                                                                     [...]
+| PCAP                             | [CapturePacket](PROCESSORS.md#capturepacket)                                                                                                                                                                                                                                                                                                                                                                                                                            [...]
+| PDH (Windows only)               | [PerformanceDataMonitor](PROCESSORS.md#performancedatamonitor)                                                                                                                                                                                                                                                                                                                                                                                                          [...]
+| ProcFs (Linux only)              | [ProcFsMonitor](PROCESSORS.md#procfsmonitor)                                                                                                                                                                                                                                                                                                                                                                                                                            [...]
+| Scripting                        | [ExecuteScript](PROCESSORS.md#executescript)<br/>**Custom Python Processors**                                                                                                                                                                                                                                                                                                                                                                                           [...]
+| Sensors                          | GetEnvironmentalSensors<br/>GetMovementSensors                                                                                                                                                                                                                                                                                                                                                                                                                          [...]
+| SFTP                             | [FetchSFTP](PROCESSORS.md#fetchsftp)<br/>[ListSFTP](PROCESSORS.md#listsftp)<br/>[PutSFTP](PROCESSORS.md#putsftp)                                                                                                                                                                                                                                                                                                                                                        [...]
+| SQL                              | [ExecuteSQL](PROCESSORS.md#executesql)<br/>[PutSQL](PROCESSORS.md#putsql)<br/>[QueryDatabaseTable](PROCESSORS.md#querydatabasetable)<br/>                                                                                                                                                                                                                                                                                                                               [...]
+| Splunk                           | [PutSplunkHTTP](PROCESSORS.md#putsplunkhttp)<br/>[QuerySplunkIndexingStatus](PROCESSORS.md#querysplunkindexingstatus)                                                                                                                                                                                                                                                                                                                                                   [...]
+| Systemd                          | [ConsumeJournald](PROCESSORS.md#consumejournald)                                                                                                                                                                                                                                                                                                                                                                                                                        [...]
+| Tensorflow                       | TFApplyGraph<br/>TFConvertImageToTensor<br/>TFExtractTopLabels<br/>                                                                                                                                                                                                                                                                                                                                                                                                     [...]
+| USB Camera                       | [GetUSBCamera](PROCESSORS.md#getusbcamera)                                                                                                                                                                                                                                                                                                                                                                                                                              [...]
+| Windows Event Log (Windows only) | [CollectorInitiatedSubscription](PROCESSORS.md#collectorinitiatedsubscription)<br/>[ConsumeWindowsEventLog](PROCESSORS.md#consumewindowseventlog)<br/>[TailEventLog](PROCESSORS.md#taileventlog)                                                                                                                                                                                                                                                                        [...]
 
  Please see our [Python guide](extensions/script/README.md) on how to write Python processors and use them within MiNiFi C++.
 
diff --git a/bootstrap.sh b/bootstrap.sh
index 4baf98f56..6c842d3c9 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -334,9 +334,13 @@ add_option NANOFI_ENABLED ${FALSE} "ENABLE_NANOFI"
 set_dependency PYTHON_ENABLED NANOFI_ENABLED
 
 add_option SPLUNK_ENABLED ${TRUE} "ENABLE_SPLUNK"
+set_dependency SPLUNK_ENABLED HTTP_CURL_ENABLED
 
 add_option GCP_ENABLED ${TRUE} "ENABLE_GCP"
 
+add_option ELASTIC_ENABLED ${FALSE} "ENABLE_ELASTICSEARCH"
+set_dependency ELASTIC_ENABLED HTTP_CURL_ENABLED
+
 add_option PROCFS_ENABLED ${TRUE} "ENABLE_PROCFS"
 
 add_disabled_option PROMETHEUS_ENABLED ${FALSE} "ENABLE_PROMETHEUS"
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index 3af419d98..778c5dd51 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -387,6 +387,7 @@ show_supported_features() {
   echo "AC. Google Cloud Support .......$(print_feature_status GCP_ENABLED)"
   echo "AD. ProcFs Support .............$(print_feature_status PROCFS_ENABLED)"
   echo "AE. Prometheus Support .........$(print_feature_status PROMETHEUS_ENABLED)"
+  echo "AF. Elasticsearch Support ......$(print_feature_status ELASTIC_ENABLED)"
   echo "****************************************"
   echo "            Build Options."
   echo "****************************************"
@@ -409,7 +410,7 @@ show_supported_features() {
 
 read_feature_options(){
   local choice
-  echo -n "Enter choice [A-Z or AA-AE or 1-7] "
+  echo -n "Enter choice [A-Z or AA-AF or 1-7] "
   read -r choice
   choice=$(echo "${choice}" | tr '[:upper:]' '[:lower:]')
   case $choice in
@@ -446,6 +447,7 @@ read_feature_options(){
     ac) ToggleFeature GCP_ENABLED ;;
     ad) ToggleFeature PROCFS_ENABLED ;;
     ae) ToggleFeature PROMETHEUS_ENABLED ;;
+    af) ToggleFeature ELASTIC_ENABLED ;;
     1) ToggleFeature TESTS_ENABLED ;;
     2) EnableAllFeatures ;;
     3) ToggleFeature JNI_ENABLED;;
@@ -464,7 +466,7 @@ read_feature_options(){
       fi
       ;;
     q) exit 0;;
-    *) echo -e "${RED}Please enter an option A-Z or AA-AE or 1-7...${NO_COLOR}" && sleep 2
+    *) echo -e "${RED}Please enter an option A-Z or AA-AF or 1-7...${NO_COLOR}" && sleep 2
   esac
 }
 
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index f8f7e7d8d..7532ec56a 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -46,6 +46,7 @@ add_custom_target(
         -c ENABLE_NANOFI=${ENABLE_NANOFI}
         -c ENABLE_SPLUNK=${ENABLE_SPLUNK}
         -c ENABLE_GCP=${ENABLE_GCP}
+        -c ENABLE_ELASTICSEARCH=${ENABLE_ELASTICSEARCH}
         -c ENABLE_SCRIPTING=${ENABLE_SCRIPTING}
         -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
         -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
@@ -119,6 +120,7 @@ add_custom_target(
         -c ENABLE_NANOFI=${ENABLE_NANOFI}
         -c ENABLE_SPLUNK=${ENABLE_SPLUNK}
         -c ENABLE_GCP=${ENABLE_GCP}
+        -c ENABLE_ELASTICSEARCH=${ENABLE_ELASTICSEARCH}
         -c ENABLE_SCRIPTING=${ENABLE_SCRIPTING}
         -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
         -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
@@ -171,6 +173,7 @@ add_custom_target(
         -c ENABLE_NANOFI=${ENABLE_NANOFI}
         -c ENABLE_SPLUNK=${ENABLE_SPLUNK}
         -c ENABLE_GCP=${ENABLE_GCP}
+        -c ENABLE_ELASTICSEARCH=${ENABLE_ELASTICSEARCH}
         -c ENABLE_SCRIPTING=${ENABLE_SCRIPTING}
         -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
         -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
@@ -223,6 +226,7 @@ add_custom_target(
         -c ENABLE_NANOFI=${ENABLE_NANOFI}
         -c ENABLE_SPLUNK=${ENABLE_SPLUNK}
         -c ENABLE_GCP=${ENABLE_GCP}
+        -c ENABLE_ELASTICSEARCH=${ENABLE_ELASTICSEARCH}
         -c ENABLE_SCRIPTING=${ENABLE_SCRIPTING}
         -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
         -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
@@ -275,6 +279,7 @@ add_custom_target(
         -c ENABLE_NANOFI=${ENABLE_NANOFI}
         -c ENABLE_SPLUNK=${ENABLE_SPLUNK}
         -c ENABLE_GCP=${ENABLE_GCP}
+        -c ENABLE_ELASTICSEARCH=${ENABLE_ELASTICSEARCH}
         -c ENABLE_SCRIPTING=${ENABLE_SCRIPTING}
         -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
         -c ENABLE_KUBERNETES=${ENABLE_KUBERNETES}
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index 5b1ef6498..4c409a57f 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -67,7 +67,7 @@ TEST_DIRECTORY="${docker_dir}/test/integration"
 export TEST_DIRECTORY
 
 # Add --no-logcapture to see logs interleaved with the test output
-BEHAVE_OPTS=(-f pretty --logging-level INFO --logging-clear-handlers)
+BEHAVE_OPTS=(-f pretty --logging-level INFO --logging-clear-handlers --tags ~@no-ci)
 
 # Specify feature or scenario to run a specific test e.g.:
 # behave "${BEHAVE_OPTS[@]}" "features/file_system_operations.feature"
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 632a782b3..65d271c0c 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -52,6 +52,7 @@ ARG ENABLE_ENCRYPT_CONFIG=ON
 ARG ENABLE_NANOFI=OFF
 ARG ENABLE_SPLUNK=OFF
 ARG ENABLE_GCP=OFF
+ARG ENABLE_ELASTICSEARCH=OFF
 ARG ENABLE_TEST_PROCESSORS=OFF
 ARG DISABLE_CURL=OFF
 ARG DISABLE_JEMALLOC=ON
@@ -127,7 +128,8 @@ RUN cmake -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_ALL="${ENABLE_ALL}" -DENABL
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
     -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" -DENABLE_PROMETHEUS="${ENABLE_PROMETHEUS}" \
-    -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. && \
+    -DENABLE_ELASTICSEARCH="${ENABLE_ELASTICSEARCH}" -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" \
+    -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. && \
     make -j "$(nproc)" package && \
     tar -xzvf "${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}.tar.gz" -C "${MINIFI_BASE_DIR}"
 
diff --git a/docker/bionic/Dockerfile b/docker/bionic/Dockerfile
index b2030e130..c152d05a2 100644
--- a/docker/bionic/Dockerfile
+++ b/docker/bionic/Dockerfile
@@ -61,6 +61,7 @@ ARG ENABLE_ENCRYPT_CONFIG=ON
 ARG ENABLE_NANOFI=OFF
 ARG ENABLE_SPLUNK=OFF
 ARG ENABLE_GCP=OFF
+ARG ENABLE_ELASTICSEARCH=OFF
 ARG ENABLE_SYSTEMD=ON
 ARG DISABLE_CURL=OFF
 ARG DISABLE_JEMALLOC=ON
@@ -97,5 +98,6 @@ RUN cd $MINIFI_BASE_DIR \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
     -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" -DENABLE_PROMETHEUS="${ENABLE_PROMETHEUS}" \
-    -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
+    -DENABLE_ELASTICSEARCH="${ENABLE_ELASTICSEARCH}" -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" \
+    -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
     && make -j "$(nproc)" package
diff --git a/docker/centos/Dockerfile b/docker/centos/Dockerfile
index da6ad170f..d1c90fcd9 100644
--- a/docker/centos/Dockerfile
+++ b/docker/centos/Dockerfile
@@ -59,6 +59,7 @@ ARG ENABLE_ENCRYPT_CONFIG=ON
 ARG ENABLE_NANOFI=OFF
 ARG ENABLE_SPLUNK=ON
 ARG ENABLE_GCP=ON
+ARG ENABLE_ELASTICSEARCH=OFF
 ARG ENABLE_SYSTEMD=ON
 ARG DISABLE_CURL=OFF
 ARG DISABLE_JEMALLOC=ON
@@ -91,6 +92,7 @@ RUN cd $MINIFI_BASE_DIR \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
     -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" -DENABLE_PROMETHEUS="${ENABLE_PROMETHEUS}" \
-    -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
+    -DENABLE_ELASTICSEARCH="${ENABLE_ELASTICSEARCH}" -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" \
+    -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
     && scl enable devtoolset-10 -- make -j "$(nproc)" package
 
diff --git a/docker/fedora/Dockerfile b/docker/fedora/Dockerfile
index 0989b03f4..ae60d6c51 100644
--- a/docker/fedora/Dockerfile
+++ b/docker/fedora/Dockerfile
@@ -63,6 +63,7 @@ ARG ENABLE_ENCRYPT_CONFIG=ON
 ARG ENABLE_NANOFI=OFF
 ARG ENABLE_SPLUNK=ON
 ARG ENABLE_GCP=ON
+ARG ENABLE_ELASTICSEARCH=OFF
 ARG ENABLE_SYSTEMD=ON
 ARG DISABLE_CURL=OFF
 ARG DISABLE_JEMALLOC=ON
@@ -97,6 +98,7 @@ RUN cd $MINIFI_BASE_DIR \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
     -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" -DENABLE_PROMETHEUS="${ENABLE_PROMETHEUS}" \
-    -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
+    -DENABLE_ELASTICSEARCH="${ENABLE_ELASTICSEARCH}" -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" \
+    -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
     && make -j "$(nproc)" package
 
diff --git a/docker/focal/Dockerfile b/docker/focal/Dockerfile
index d2c8e743a..74cfe7949 100644
--- a/docker/focal/Dockerfile
+++ b/docker/focal/Dockerfile
@@ -62,6 +62,7 @@ ARG ENABLE_ENCRYPT_CONFIG=ON
 ARG ENABLE_NANOFI=OFF
 ARG ENABLE_SPLUNK=OFF
 ARG ENABLE_GCP=OFF
+ARG ENABLE_ELASTICSEARCH=OFF
 ARG ENABLE_SYSTEMD=ON
 ARG DISABLE_CURL=OFF
 ARG DISABLE_JEMALLOC=ON
@@ -97,5 +98,6 @@ RUN cd $MINIFI_BASE_DIR \
     -DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}" -DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
     -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}" -DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}" -DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
     -DENABLE_KUBERNETES="${ENABLE_KUBERNETES}" -DENABLE_GCP="${ENABLE_GCP}" -DENABLE_PROCFS="${ENABLE_PROCFS}" -DENABLE_PROMETHEUS="${ENABLE_PROMETHEUS}" \
-    -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
+    -DENABLE_ELASTICSEARCH="${ENABLE_ELASTICSEARCH}" -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" \
+    -DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. \
     && make -j "$(nproc)" package
diff --git a/docker/requirements.txt b/docker/requirements.txt
index 19b50056e..37e226827 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -4,7 +4,7 @@ docker==5.0.0
 kafka-python==2.0.2
 confluent-kafka==1.7.0
 PyYAML==5.4.1
-m2crypto==0.37.1
+m2crypto==0.38.0
 watchdog==2.1.2
 pyopenssl==21.0.0
 azure-storage-blob==12.9.0
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
index d668beb08..ec392aeb0 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -77,6 +77,16 @@ class MiNiFi_integration_test:
         assert self.wait_for_container_startup_to_finish('splunk')
         assert self.cluster.enable_splunk_hec_indexer('splunk', 'splunk_hec_token')
 
+    def start_elasticsearch(self):
+        self.cluster.acquire_container('elasticsearch', 'elasticsearch')
+        self.cluster.deploy('elasticsearch')
+        assert self.wait_for_container_startup_to_finish('elasticsearch')
+
+    def start_opensearch(self):
+        self.cluster.acquire_container('opensearch', 'opensearch')
+        self.cluster.deploy('opensearch')
+        assert self.wait_for_container_startup_to_finish('opensearch')
+
     def start(self):
         logging.info("MiNiFi_integration_test start")
         self.cluster.deploy_flow()
@@ -220,6 +230,21 @@ class MiNiFi_integration_test:
     def check_empty_gcs_bucket(self, gcs_container_name):
         assert self.cluster.is_gcs_bucket_empty(gcs_container_name)
 
+    def check_empty_elastic(self, elastic_container_name):
+        assert self.cluster.is_elasticsearch_empty(elastic_container_name)
+
+    def elastic_generate_apikey(self, elastic_container_name):
+        return self.cluster.elastic_generate_apikey(elastic_container_name)
+
+    def create_doc_elasticsearch(self, elastic_container_name, index_name, doc_id):
+        assert self.cluster.create_doc_elasticsearch(elastic_container_name, index_name, doc_id)
+
+    def check_elastic_field_value(self, elastic_container_name, index_name, doc_id, field_name, field_value):
+        assert self.cluster.check_elastic_field_value(elastic_container_name, index_name, doc_id, field_name, field_value)
+
+    def add_elastic_user_to_opensearch(self, container_name):
+        assert self.cluster.add_elastic_user_to_opensearch(container_name)
+
     def check_minifi_log_contents(self, line, timeout_seconds=60, count=1):
         self.check_container_log_contents("minifi-cpp", line, timeout_seconds, count)
 
diff --git a/docker/test/integration/features/elasticsearch.feature b/docker/test/integration/features/elasticsearch.feature
new file mode 100644
index 000000000..f2ebca810
--- /dev/null
+++ b/docker/test/integration/features/elasticsearch.feature
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+@no-ci  # Elasticsearch container requires more RAM than what the CI environment has
+Feature: Managing documents on Elasticsearch with PostElasticsearch
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario Outline: MiNiFi instance indexes a document on Elasticsearch using Basic Authentication
+    Given an Elasticsearch server is set up and running
+    And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "{ "field1" : "value1" }" is present in "/tmp/input"
+    And a PostElasticsearch processor
+    And the "Index" property of the PostElasticsearch processor is set to "my_index"
+    And the "Identifier" property of the PostElasticsearch processor is set to "my_id"
+    And the "Action" property of the PostElasticsearch processor is set to <action>
+    And a SSL context service is set up for PostElasticsearch and Elasticsearch
+    And an ElasticsearchCredentialsService is set up for PostElasticsearch with Basic Authentication
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PostElasticsearch
+    And the "success" relationship of the PostElasticsearch processor is connected to the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "{ "field1" : "value1" }" is placed in the monitored directory in less than 20 seconds
+    And Elasticsearch has a document with "my_id" in "my_index" that has "value1" set in "field1"
+
+    Examples:
+    | action   |
+    | "index"  |
+    | "create" |
+
+  Scenario: MiNiFi instance deletes a document from Elasticsearch using API Key authentication
+    Given an Elasticsearch server is set up and a single document is present with "preloaded_id" in "my_index"
+    And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "hello world" is present in "/tmp/input"
+    And a PostElasticsearch processor
+    And the "Index" property of the PostElasticsearch processor is set to "my_index"
+    And the "Identifier" property of the PostElasticsearch processor is set to "preloaded_id"
+    And the "Action" property of the PostElasticsearch processor is set to "delete"
+    And a SSL context service is set up for PostElasticsearch and Elasticsearch
+    And an ElasticsearchCredentialsService is set up for PostElasticsearch with ApiKey
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PostElasticsearch
+    And the "success" relationship of the PostElasticsearch processor is connected to the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "hello world" is placed in the monitored directory in less than 20 seconds
+    And Elasticsearch is empty
+
+  Scenario: MiNiFi instance partially updates a document in Elasticsearch using Basic Authentication
+    Given an Elasticsearch server is set up and a single document is present with "preloaded_id" in "my_index" with "value1" in "field1"
+    And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "{ "field2" : "value2" }" is present in "/tmp/input"
+    And a PostElasticsearch processor
+    And the "Index" property of the PostElasticsearch processor is set to "my_index"
+    And the "Identifier" property of the PostElasticsearch processor is set to "preloaded_id"
+    And the "Action" property of the PostElasticsearch processor is set to "update"
+    And a SSL context service is set up for PostElasticsearch and Elasticsearch
+    And an ElasticsearchCredentialsService is set up for PostElasticsearch with Basic Authentication
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PostElasticsearch
+    And the "success" relationship of the PostElasticsearch processor is connected to the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "{ "field2" : "value2" }" is placed in the monitored directory in less than 20 seconds
+    And Elasticsearch has a document with "preloaded_id" in "my_index" that has "value1" set in "field1"
+    And Elasticsearch has a document with "preloaded_id" in "my_index" that has "value2" set in "field2"
diff --git a/docker/test/integration/features/opensearch.feature b/docker/test/integration/features/opensearch.feature
new file mode 100644
index 000000000..0e45e5edd
--- /dev/null
+++ b/docker/test/integration/features/opensearch.feature
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+@no-ci  # Opensearch container requires more RAM than what the CI environment has
+Feature: PostElasticsearch works on Opensearch (Opensearch doesnt support API Keys)
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario Outline: MiNiFi instance creates a document on Opensearch using Basic Authentication
+    Given an Opensearch server is set up and running
+    And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "{ "field1" : "value1" }" is present in "/tmp/input"
+    And a PostElasticsearch processor
+    And the "Hosts" property of the PostElasticsearch processor is set to "https://opensearch:9200"
+    And the "Index" property of the PostElasticsearch processor is set to "my_index"
+    And the "Identifier" property of the PostElasticsearch processor is set to "my_id"
+    And the "Action" property of the PostElasticsearch processor is set to <action>
+    And a SSL context service is set up for PostElasticsearch and Opensearch
+    And an ElasticsearchCredentialsService is set up for PostElasticsearch with Basic Authentication
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PostElasticsearch
+    And the "success" relationship of the PostElasticsearch processor is connected to the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "{ "field1" : "value1" }" is placed in the monitored directory in less than 20 seconds
+    And Opensearch has a document with "my_id" in "my_index" that has "value1" set in "field1"
+
+    Examples:
+      | action   |
+      | "index"  |
+      | "create" |
+
+  Scenario: MiNiFi instance deletes a document from Opensearch using Basic Authentication
+    Given an Opensearch server is set up and a single document is present with "preloaded_id" in "my_index"
+    And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "hello world" is present in "/tmp/input"
+    And a PostElasticsearch processor
+    And the "Hosts" property of the PostElasticsearch processor is set to "https://opensearch:9200"
+    And the "Index" property of the PostElasticsearch processor is set to "my_index"
+    And the "Identifier" property of the PostElasticsearch processor is set to "preloaded_id"
+    And the "Action" property of the PostElasticsearch processor is set to "delete"
+    And a SSL context service is set up for PostElasticsearch and Opensearch
+    And an ElasticsearchCredentialsService is set up for PostElasticsearch with Basic Authentication
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PostElasticsearch
+    And the "success" relationship of the PostElasticsearch processor is connected to the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "hello world" is placed in the monitored directory in less than 20 seconds
+    And Opensearch is empty
+
+  Scenario: MiNiFi instance partially updates a document in Opensearch using Basic Authentication
+    Given an Opensearch server is set up and a single document is present with "preloaded_id" in "my_index" with "value1" in "field1"
+    And a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "{ "field2" : "value2" }" is present in "/tmp/input"
+    And a PostElasticsearch processor
+    And the "Hosts" property of the PostElasticsearch processor is set to "https://opensearch:9200"
+    And the "Index" property of the PostElasticsearch processor is set to "my_index"
+    And the "Identifier" property of the PostElasticsearch processor is set to "preloaded_id"
+    And the "Action" property of the PostElasticsearch processor is set to "update"
+    And a SSL context service is set up for PostElasticsearch and Opensearch
+    And an ElasticsearchCredentialsService is set up for PostElasticsearch with Basic Authentication
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PostElasticsearch
+    And the "success" relationship of the PostElasticsearch processor is connected to the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "{ "field2" : "value2" }" is placed in the monitored directory in less than 20 seconds
+    And Opensearch has a document with "preloaded_id" in "my_index" that has "value1" set in "field1"
+    And Opensearch has a document with "preloaded_id" in "my_index" that has "value2" set in "field2"
diff --git a/docker/test/integration/minifi/controllers/ElasticsearchCredentialsService.py b/docker/test/integration/minifi/controllers/ElasticsearchCredentialsService.py
new file mode 100644
index 000000000..7923a4abb
--- /dev/null
+++ b/docker/test/integration/minifi/controllers/ElasticsearchCredentialsService.py
@@ -0,0 +1,13 @@
+from ..core.ControllerService import ControllerService
+
+
+class ElasticsearchCredentialsService(ControllerService):
+    def __init__(self, api_key=None, name=None):
+        super(ElasticsearchCredentialsService, self).__init__(name=name)
+
+        self.service_class = 'ElasticsearchCredentialsControllerService'
+        if api_key is None:
+            self.properties['Username'] = "elastic"
+            self.properties['Password'] = "password"
+        else:
+            self.properties['API Key'] = api_key
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index 1cab875ea..525cf6af5 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -256,6 +256,33 @@ class DockerTestCluster(SingleNodeDockerCluster):
         (code, output) = self.client.containers.get(container_name).exec_run(["ls", "/storage/test-bucket"])
         return code == 0 and output == b''
 
+    def is_elasticsearch_empty(self, container_name):
+        (code, output) = self.client.containers.get(container_name).exec_run(["curl", "-u", "elastic:password", "-k", "-XGET", "https://localhost:9200/_search"])
+        return code == 0 and b'"hits":[]' in output
+
+    def create_doc_elasticsearch(self, container_name, index_name, doc_id):
+        (code, output) = self.client.containers.get(container_name).exec_run(["/bin/bash", "-c",
+                                                                              "curl -u elastic:password -k -XPUT https://localhost:9200/" + index_name + "/_doc/" + doc_id + " -H Content-Type:application/json -d'{\"field1\":\"value1\"}'"])
+        return code == 0 and ('"_id":"' + doc_id + '"').encode() in output
+
+    def check_elastic_field_value(self, container_name, index_name, doc_id, field_name, field_value):
+        (code, output) = self.client.containers.get(container_name).exec_run(["/bin/bash", "-c",
+                                                                              "curl -u elastic:password -k -XGET https://localhost:9200/" + index_name + "/_doc/" + doc_id])
+        return code == 0 and (field_name + '":"' + field_value).encode() in output
+
+    def elastic_generate_apikey(self, elastic_container_name):
+        (code, output) = self.client.containers.get(elastic_container_name).exec_run(["/bin/bash", "-c",
+                                                                                      "curl -u elastic:password -k -XPOST https://localhost:9200/_security/api_key -H Content-Type:application/json -d'{\"name\":\"my-api-key\",\"expiration\":\"1d\",\"role_descriptors\":{\"role-a\": {\"cluster\": [\"all\"],\"index\": [{\"names\": [\"my_index\"],\"privileges\": [\"all\"]}]}}}'"])
+        output = output.decode(self.get_stdout_encoding())
+        output_lines = output.splitlines()
+        result = json.loads(output_lines[-1])
+        return result["encoded"]
+
+    def add_elastic_user_to_opensearch(self, container_name):
+        (code, output) = self.client.containers.get(container_name).exec_run(["/bin/bash", "-c",
+                                                                              'curl -u admin:admin -k -XPUT https://opensearch:9200/_plugins/_security/api/internalusers/elastic -H Content-Type:application/json -d\'{"password":"password","backend_roles":["admin"]}\''])
+        return code == 0 and '"status":"CREATED"'.encode() in output
+
     def query_postgres_server(self, postgresql_container_name, query, number_of_rows):
         (code, output) = self.client.containers.get(postgresql_container_name).exec_run(["psql", "-U", "postgres", "-c", query])
         output = output.decode(self.get_stdout_encoding())
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index 3781138a9..2a4c6934a 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -54,6 +54,8 @@ class DockerTestDirectoryBindings:
         shutil.copytree(test_dir + "/resources/python", self.data_directories[self.test_id]["resources_dir"] + "/python")
         shutil.copytree(test_dir + "/resources/opcua", self.data_directories[self.test_id]["resources_dir"] + "/opcua")
         shutil.copytree(test_dir + "/resources/lua", self.data_directories[self.test_id]["resources_dir"] + "/lua")
+        shutil.copytree(test_dir + "/resources/elasticsearch/certs", self.data_directories[self.test_id]["resources_dir"] + "/elasticsearch")
+        shutil.copytree(test_dir + "/resources/opensearch/certs", self.data_directories[self.test_id]["resources_dir"] + "/opensearch")
 
     def get_data_directories(self, test_id):
         return self.data_directories[test_id]
diff --git a/docker/test/integration/minifi/core/ElasticsearchContainer.py b/docker/test/integration/minifi/core/ElasticsearchContainer.py
new file mode 100644
index 000000000..878603de5
--- /dev/null
+++ b/docker/test/integration/minifi/core/ElasticsearchContainer.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+from .Container import Container
+
+
+class ElasticsearchContainer(Container):
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'elasticsearch', vols, network, image_store, command)
+
+    def get_startup_finished_log_entry(self):
+        return '"current.health":"GREEN"'
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running Elasticsearch docker container...')
+        self.client.containers.run(
+            self.image_store.get_image(self.get_engine()),
+            detach=True,
+            name=self.name,
+            environment=[
+                "ELASTIC_PASSWORD=password",
+            ],
+            network=self.network.name)
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/ImageStore.py b/docker/test/integration/minifi/core/ImageStore.py
index 8db07d264..ee87b3332 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -60,6 +60,10 @@ class ImageStore:
             image = self.__build_tcp_client_image()
         elif container_engine == "prometheus":
             image = self.__build_prometheus_image()
+        elif container_engine == "elasticsearch":
+            image = self.__build_elasticsearch_image()
+        elif container_engine == "opensearch":
+            image = self.__build_opensearch_image()
         else:
             raise Exception("There is no associated image for " + container_engine)
 
@@ -184,6 +188,12 @@ class ImageStore:
     def __build_prometheus_image(self):
         return self.__build_image_by_path(self.test_dir + "/resources/prometheus", 'minifi-prometheus')
 
+    def __build_elasticsearch_image(self):
+        return self.__build_image_by_path(self.test_dir + "/resources/elasticsearch", 'elasticsearch')
+
+    def __build_opensearch_image(self):
+        return self.__build_image_by_path(self.test_dir + "/resources/opensearch", 'opensearch')
+
     def __build_image(self, dockerfile, context_files=[]):
         conf_dockerfile_buffer = BytesIO()
         docker_context_buffer = BytesIO()
diff --git a/docker/test/integration/minifi/core/OpensearchContainer.py b/docker/test/integration/minifi/core/OpensearchContainer.py
new file mode 100644
index 000000000..07f462b71
--- /dev/null
+++ b/docker/test/integration/minifi/core/OpensearchContainer.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+from .Container import Container
+
+
+class OpensearchContainer(Container):
+    def __init__(self, name, vols, network, image_store, command=None):
+        super().__init__(name, 'opensearch', vols, network, image_store, command)
+
+    def get_startup_finished_log_entry(self):
+        return 'Hot-reloading of audit configuration is enabled'
+
+    def deploy(self):
+        if not self.set_deployed():
+            return
+
+        logging.info('Creating and running Opensearch docker container...')
+        self.client.containers.run(
+            self.image_store.get_image(self.get_engine()),
+            detach=True,
+            name=self.name,
+            network=self.network.name)
+        logging.info('Added container \'%s\'', self.name)
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index 15e0660f1..fd99e61ea 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -33,6 +33,8 @@ from .PostgreSQLServerContainer import PostgreSQLServerContainer
 from .MqttBrokerContainer import MqttBrokerContainer
 from .OPCUAServerContainer import OPCUAServerContainer
 from .SplunkContainer import SplunkContainer
+from .ElasticsearchContainer import ElasticsearchContainer
+from .OpensearchContainer import OpensearchContainer
 from .SyslogUdpClientContainer import SyslogUdpClientContainer
 from .SyslogTcpClientContainer import SyslogTcpClientContainer
 from .MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster
@@ -119,6 +121,10 @@ class SingleNodeDockerCluster(Cluster):
             return self.containers.setdefault(name, OPCUAServerContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == 'splunk':
             return self.containers.setdefault(name, SplunkContainer(name, self.vols, self.network, self.image_store, command))
+        elif engine == 'elasticsearch':
+            return self.containers.setdefault(name, ElasticsearchContainer(name, self.vols, self.network, self.image_store, command))
+        elif engine == 'opensearch':
+            return self.containers.setdefault(name, OpensearchContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == "syslog-udp-client":
             return self.containers.setdefault(name, SyslogUdpClientContainer(name, self.vols, self.network, self.image_store, command))
         elif engine == "syslog-tcp-client":
diff --git a/docker/test/integration/minifi/processors/PostElasticsearch.py b/docker/test/integration/minifi/processors/PostElasticsearch.py
new file mode 100644
index 000000000..4fd959eca
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PostElasticsearch.py
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.Processor import Processor
+
+
+class PostElasticsearch(Processor):
+    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+        super(PostElasticsearch, self).__init__(
+            'PostElasticsearch',
+            properties={
+                'Hosts': 'https://elasticsearch:9200',
+                'Index': 'test',
+                'Identifier': '${filename}'
+            },
+            auto_terminate=['success', 'failure', 'error'],
+            schedule=schedule)
diff --git a/docker/test/integration/resources/elasticsearch/Dockerfile b/docker/test/integration/resources/elasticsearch/Dockerfile
new file mode 100644
index 000000000..a7ca77146
--- /dev/null
+++ b/docker/test/integration/resources/elasticsearch/Dockerfile
@@ -0,0 +1,7 @@
+FROM elasticsearch:8.2.2
+COPY elasticsearch.yml /usr/share/elasticsearch/config/elasticsearch.yml
+COPY certs/elastic_http.key /usr/share/elasticsearch/config/certs/elastic_http.key
+COPY certs/elastic_http.crt /usr/share/elasticsearch/config/certs/elastic_http.crt
+COPY certs/elastic_transport.key /usr/share/elasticsearch/config/certs/elastic_transport.key
+COPY certs/elastic_transport.crt /usr/share/elasticsearch/config/certs/elastic_transport.crt
+COPY certs/root_ca.crt /usr/share/elasticsearch/config/certs/root_ca.crt
diff --git a/docker/test/integration/resources/elasticsearch/certs/elastic_http.crt b/docker/test/integration/resources/elasticsearch/certs/elastic_http.crt
new file mode 100644
index 000000000..90b292edd
--- /dev/null
+++ b/docker/test/integration/resources/elasticsearch/certs/elastic_http.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDFzCCAf+gAwIBAgIEA0as9DANBgkqhkiG9w0BAQsFADASMRAwDgYDVQQDDAdy
+b290IENBMB4XDTIyMDYwODE1MjAxNVoXDTMyMDYwNTE1MjAxNVowGDEWMBQGA1UE
+AwwNZWxhc3RpY3NlYXJjaDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
+AKRdem556wIlQGdtEHJM0FsKGcF2paVXl2uhsCLBduknaG+nVBqLQlw1iJh3Q2ru
+6C90hCI7F4Q4xTjeY1+MRzh1YjD/SwbhkUwFU3KB5QOQ+yJ/bM+sAWM+90PRlV8Z
+2WxV8YdGrXlO7PzI4QfWG/9n3krA6KhDofBK5oT0JgYA/QaLFHm6bRFV6E9Octf+
+5fcQMB1xPzC/XT/4aCVMIVsI+NDpn77x00UTLKLfQLzOwEwnyVKjOSghVpKfmF1u
+dt78FumAkFWl9W5MAll4lFTLmD4wgJLh8CkXhRfGRrKzSTTt4Gp8Nu3ZzIKwSqaP
+jwfthx0q+xQx9G1AW6BPvx0CAwEAAaNvMG0wCQYDVR0TBAIwADAdBgNVHQ4EFgQU
+2jmj7l5rSw0yVb/vlWAYkK/YBwkwHwYDVR0jBBgwFoAU2jmj7l5rSw0yVb/vlWAY
+kK/YBwkwEwYDVR0lBAwwCgYIKwYBBQUHAwEwCwYDVR0PBAQDAgeAMA0GCSqGSIb3
+DQEBCwUAA4IBAQBgMbCHkuLGDZKxhO337KvOlaeIBpEoKFpuuSA8GIF4qD4IkFlB
+dqk/2j6iTcRkfzfZSqoMpvJlUppdA3Qg3Xmly5IaqbXD8kj32fOSDsm+CQJb8Yyo
+Fb2YMWgQi/OgB99DOuPn43vlfjW7+dXNJhjg8QSLMxLMPEedEjZPrXe/7BvbJEH3
+IK7jZjuJzTuhzAZ6vAdPpMIKhaXJ9FdEU6zeubyo9GcfAjLiQt+MzuGsraRA5qxO
+SUYmw1pYQz+x9fgvNluaLq8mzaxI6vHVQCckoVFnahL1I2qjktrqr4lWd77fRdoV
+KtVEzApc8sOdWnK+hpgPja8+vqsiCG9PLAU/
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/elasticsearch/certs/elastic_http.key b/docker/test/integration/resources/elasticsearch/certs/elastic_http.key
new file mode 100644
index 000000000..847ad81a7
--- /dev/null
+++ b/docker/test/integration/resources/elasticsearch/certs/elastic_http.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCkXXpueesCJUBn
+bRByTNBbChnBdqWlV5drobAiwXbpJ2hvp1Qai0JcNYiYd0Nq7ugvdIQiOxeEOMU4
+3mNfjEc4dWIw/0sG4ZFMBVNygeUDkPsif2zPrAFjPvdD0ZVfGdlsVfGHRq15Tuz8
+yOEH1hv/Z95KwOioQ6HwSuaE9CYGAP0GixR5um0RVehPTnLX/uX3EDAdcT8wv10/
++GglTCFbCPjQ6Z++8dNFEyyi30C8zsBMJ8lSozkoIVaSn5hdbnbe/BbpgJBVpfVu
+TAJZeJRUy5g+MICS4fApF4UXxkays0k07eBqfDbt2cyCsEqmj48H7YcdKvsUMfRt
+QFugT78dAgMBAAECggEAA8WGBNlKVxBuGRF2clD96kuMihUZZEu4FJxaCNTMpY6l
+xa4exBnUSikLpd1SuuDu4Kvj86R3JoMQ+ibsO9SMYSjHDzMZ0Hdw7erPXo4e0wVK
+0KCrlAZb5SBG7JOiFv5mWqc24Bud6RiHtUiSFVZzP69CpA980eOfaH6nfynaYi0u
+g5Gp7hBFGvvTa5cAZ9msEVXEhVI4FeBAaZXdN/lrsoHfXzMhNvzZeLyDA24lsBQ3
+pYY//V7LT5vRRUmERFZT9FH2eK6YIbTy284B3H1SV50Lc+AP2OehKoTEC8rADJp6
+jB28o0lXTHe0NYFAQpqpwBoDkFwCu8v5kLxuOe74lwKBgQDfl38F4yHEcEBqMBbk
+X9/PNMFO05N8/mc2WpLS5mDaTTuQdn+ybovpx0my4BTZr7xcaH7ntSHkhRRwJDlA
+7r2D7P+icoEcsFDO9HaTCJ3f39qv7qNLOBj5edN5qZuKDXj3cmU7jnzYlfESKYsi
+T1sQV4XvbLu/EbBY8IrMr51pMwKBgQC8MFfd8syqATEAXQ2ClAaz93oO7IKWSgGC
+/aLXzjO11IkxlUHEUzIrKGPrFpo09uav+FUC7os63QVRJg8lNecePnZu2dWihZIc
+Ro9idsGnGstU7qFtl+40Mrl+U6yCQKUAJWPUqX/D8pBnhfrIEikrvcBP0PbkqDD4
+K7VAr2hWbwKBgQCD2RwoZpWk9uSlOZJmPTsHPv628wDH9mT6olJuVzfJwz1apOYM
+XC/ZPZFyD7172KtkeqJLFH7V+LvinRm99op+3ySnsIn+VuJwB6xca/pc5+5XTIuy
+cXQYuQRBfAiiUeuBT03u3sk0Fsc/hIvtQYpbiqEKXL4hn9azu7S9mtliDQKBgE5i
+zJa0Za+CY4jFObVkNw6LTJoexZ4YIV0QjN3bFArvBl5Sz1wZU1JKtqPbfEwHc7tk
+cvSPrArOuUI92h0jwKsGVYqCzcuuW1fPxDW9RZh3Pq6X30GpAR1Y647FB5wRlSjk
+ahrQp1lVFfUpB3aCqgeAODSG0/AWZp9YhVz2Mz2dAoGBANLeln9LyZQXY5NEoF1Z
+12NU8+xzXdepqt6kBdoSIt+2eAucBNZHCfu1DrEMSIQENr/dLFzCiC2GrcczRFsW
+2oQ0t3S5tR5B7zHaDSEPFf+DBIgNbPxIerYnJ9m1TuQPIHyDlG3du3C5dVyVqNed
+fiaGzQ2FmjqPCk39iWEjBL5T
+-----END PRIVATE KEY-----
diff --git a/docker/test/integration/resources/elasticsearch/certs/elastic_transport.crt b/docker/test/integration/resources/elasticsearch/certs/elastic_transport.crt
new file mode 100644
index 000000000..4367809d3
--- /dev/null
+++ b/docker/test/integration/resources/elasticsearch/certs/elastic_transport.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIC/jCCAeagAwIBAgIEBHOyVjANBgkqhkiG9w0BAQsFADASMRAwDgYDVQQDDAdy
+b290IENBMB4XDTIyMDYwODE1MjAxNVoXDTMyMDYwNTE1MjAxNVowFDESMBAGA1UE
+AwwJMTI3LjAuMC4xMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtR/I
+wYDdyZy1iW2y9bi+14p7roO6o3EXWNxCPNlEfCbEQlLWaK3Nq45+bTzvMCN9YZvi
+OjjqisIVsiL1oV6MaIBbGaIdtIvDcUF+wGcqTorzVIHOttAL43zRW0RrJLFeMW7E
+7RydntDDeAq5jcjP+DwCRFWBRHYFZhcMbNSyqMHPghRk704MxRp0bFuJCeadIy8/
+oBluiEa0J7ajizAL+IQYOdCdteUI73PCUFF33WvueIC+REz6CFV+Hzgmn6F2e04W
+Y/h0gbXnyjR2tEqCcK68O7eOujQGqz6Ai/6yE+1xRRDAESUIrNAep3qXecnrguWj
+l9LX28BVO/epWaaR8QIDAQABo1owWDAJBgNVHRMEAjAAMB0GA1UdDgQWBBTaOaPu
+XmtLDTJVv++VYBiQr9gHCTAfBgNVHSMEGDAWgBTaOaPuXmtLDTJVv++VYBiQr9gH
+CTALBgNVHQ8EBAMCB4AwDQYJKoZIhvcNAQELBQADggEBABTIcrRsCbtyat2Cdyl2
+ei9hd+Bow6SBR3fJiuwIWQi4UXs9Fg9ts015dRNZf5i4H3POaZYkI69fCIObXzIq
+YwVKS2p5nC5RbY4icgsLkPRXSts2HhuZKbbuuxw4nM7v4iHpj394lujNcX2Ky5zc
+m8jAQSQvh/HnvLWb8DhWCV8yp8lYF8u7PLefm8TixA/fPyfZsT+BYLlIbQ6r9ry/
+kXOYaQ0JGCSMrMGAvL4nxugoNF2LwNTqxxC1/UOCzn52A8BwCb5tvU1B2+k3oYKq
+on8KH4NdGq1kX+gf3/BZ6vOFsf/UmpTrWPdW/3Fow9MzIUkUPf7QgsBlM0m95+Fm
+cFM=
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/elasticsearch/certs/elastic_transport.key b/docker/test/integration/resources/elasticsearch/certs/elastic_transport.key
new file mode 100644
index 000000000..d7b99d97b
--- /dev/null
+++ b/docker/test/integration/resources/elasticsearch/certs/elastic_transport.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC1H8jBgN3JnLWJ
+bbL1uL7Xinuug7qjcRdY3EI82UR8JsRCUtZorc2rjn5tPO8wI31hm+I6OOqKwhWy
+IvWhXoxogFsZoh20i8NxQX7AZypOivNUgc620AvjfNFbRGsksV4xbsTtHJ2e0MN4
+CrmNyM/4PAJEVYFEdgVmFwxs1LKowc+CFGTvTgzFGnRsW4kJ5p0jLz+gGW6IRrQn
+tqOLMAv4hBg50J215Qjvc8JQUXfda+54gL5ETPoIVX4fOCafoXZ7ThZj+HSBtefK
+NHa0SoJwrrw7t466NAarPoCL/rIT7XFFEMARJQis0B6nepd5yeuC5aOX0tfbwFU7
+96lZppHxAgMBAAECggEABQHQBVZcg7hIlvCQNGSdCcHV3AAqq20FyKXxM6bIcOu4
+I98Aq/oXdRKEdTdG0OQDNv+pHcD8wMUxMmj94j0rzQo8SYwXnZrrKRkYvY2tf2hq
+XolA8lt7ehD5MT/6oXXmnK7Q29lSDKTQYhETexr4RJ165YEfCKC8YgBS0jpTaEv5
+DalYBxgaJOU6pBX/8xRk8rgwuRad0tYcWuHGTWnB3pCzw6+4QlIohhlaXVn1RG2p
+4B5hRf66y7SuAA8S1F+tlmZQogZ5XTPGeEequzqMOy4/uWaALRlZN+tVG8H7Q6jM
+SaSqLgDDMiXzCX3s0jmlGx89sF3+7Qe/YkEXCujZjwKBgQDQZc5Gs+ioeA0Rnr0Z
+MuI4hSxF9LFAxp6qxT5xcVOTUyf/ZeoVgUpMjQWjvQoNyub1KPWSBorumWxKzXWN
+UUNUFn/Al7tpNC2U2YQ+KD+SHjdPC5A8DFzA95qITauXUZumIdpdWyeLpx840gxy
+rSt0Uizb9uN5MFgoydjp5e8TBwKBgQDefyQUUhPKQRBTzTM1l4ZAxvYGd8FJHgoD
+qsn7oc4cLmL8kzDo6GWNx/jqVwGB8o25j4q1y22tDJTV34ynfUhgFNoKuip4a6W4
+KG3MrKCPuAxRMp2JDasoeyeLzY/hsfLpyIIRqM2noghR/tY3sh4NDmt06SWhKRP1
+OMWwwKudRwKBgQC2AuS5XbJrSYboh8LSul1YTf/QDZmuYIr9zY3vGLKM7sj1SHa9
+V5Gj8Fsnmhio2TkEkESCyFIZeEhbwLzVs07hp1Dmzd7yUI8q/RuCCjcTkG5wDEJq
+H8X3B4qhV2zxmWYfIFwOw2lzkIA1aUdT9SkJzCe/6g7crrugzEI2bwtyiwKBgQDH
+t2JgZyhidTK9tkTQRzn4LV+rgJVcF32vuv9hQuVqwGyUAV92wRxfPu/9EapFxb5X
+oTRGaNsC6Xcz3102/qsXq7/ovyDeROpVOt0MSqw5NWDBBekIm99WD+34fMU5Nu0v
+0o6GYXpwjW5gg5owFk8JMzdbkuf3GJNUm+g/90JUgwKBgDKtFlNjhqIxNPcjgGpS
+7ElwDs51VG0gPSaJqaIIy+/Cq8k4pPBwy2Cgyr1kvX8rGuadeC4rN/8UFEw1A79/
+wZj5RD+zTfa6pxtiK2JFN7mubQ+eygkngbvdB8sp8bqcoSt8N3iqnMRGcyaIBwzE
+6GN8QZtQiCJ17Nd9I/akONbS
+-----END PRIVATE KEY-----
diff --git a/docker/test/integration/resources/elasticsearch/certs/minifi_client.crt b/docker/test/integration/resources/elasticsearch/certs/minifi_client.crt
new file mode 100644
index 000000000..17e182158
--- /dev/null
+++ b/docker/test/integration/resources/elasticsearch/certs/minifi_client.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDGTCCAgGgAwIBAgIEAzJaSjANBgkqhkiG9w0BAQsFADASMRAwDgYDVQQDDAdy
+b290IENBMB4XDTIyMDYwODE1MjAxNVoXDTMyMDYwNTE1MjAxNVowGjEYMBYGA1UE
+AwwPbWluaWZpLWNwcC1mbG93MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKC
+AQEAioSZCHxWY9RWIwdLG+OLWAOdZF5HRKjD+41oA11zT6O/702AvYI7e9oGAb0S
+8z80C92UMWEtJapPfG0eDXPH9F6KPxgT2sMhbY46Lwd8Sdv1HowcR95ULIwNsn06
+boIIFF7/0/8paE9XWOO9ZlhATO/cgsG6EHeMfZr6J3KwC2kfJQSRW9wnBS67StCi
+9vJf0bFKjuhHDTOWtqA8P3Mv6ey/44tOpMAO2hW4r/wif/n+qljEGjllDWFYoBMQ
+pZeGUUb+N8HfgQggWg017yMVQXRSadH4bF1HDaDYOL+E/0spg3bSN0gYJGWnAtqt
+mIx4A7fHzqJNJxwQF+RKL+lo9QIDAQABo28wbTAJBgNVHRMEAjAAMB0GA1UdDgQW
+BBTaOaPuXmtLDTJVv++VYBiQr9gHCTAfBgNVHSMEGDAWgBTaOaPuXmtLDTJVv++V
+YBiQr9gHCTATBgNVHSUEDDAKBggrBgEFBQcDAjALBgNVHQ8EBAMCB4AwDQYJKoZI
+hvcNAQELBQADggEBALYUrK+h1yJNSDLvlV1lPENelLGFj7XyQqM3IPK1CMpYqVlR
+NfeYt5/gHv96b4XijOMNr1vgJz1T10x+LbqQaDMzOWGFg8uyCe64xOH39UUWkF72
+mTftCOaszCO/stkT6jffa492lV0bY6+q3OpJKCbxJbHQQQkftpSyivXvblnXRO1B
+WXAg7YvhMsca+xyImcJQ5nmqEilpbB4gH2Kena2NMdFy/FTGqwRSJFQNevPYxvc8
+Ok9VcphPmn6u8vZG5zb0KeaknitUDg+PlJKv7OPR7QC99KfgVKAzlyZ4GFcHkMVN
+ahH8dxTCk7sOcF5MBecLtUBhRm0u4m66cQH/gXo=
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/elasticsearch/certs/minifi_client.key b/docker/test/integration/resources/elasticsearch/certs/minifi_client.key
new file mode 100644
index 000000000..52e32e335
--- /dev/null
+++ b/docker/test/integration/resources/elasticsearch/certs/minifi_client.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCKhJkIfFZj1FYj
+B0sb44tYA51kXkdEqMP7jWgDXXNPo7/vTYC9gjt72gYBvRLzPzQL3ZQxYS0lqk98
+bR4Nc8f0Xoo/GBPawyFtjjovB3xJ2/UejBxH3lQsjA2yfTpugggUXv/T/yloT1dY
+471mWEBM79yCwboQd4x9mvoncrALaR8lBJFb3CcFLrtK0KL28l/RsUqO6EcNM5a2
+oDw/cy/p7L/ji06kwA7aFbiv/CJ/+f6qWMQaOWUNYVigExCll4ZRRv43wd+BCCBa
+DTXvIxVBdFJp0fhsXUcNoNg4v4T/SymDdtI3SBgkZacC2q2YjHgDt8fOok0nHBAX
+5Eov6Wj1AgMBAAECggEAFKH/sZXjwGcGlZ0mgPb58JhJP4QTlZSV7Mh5b9bhsEEx
++9FNlezqM16cenHRes41UHcm9dwFv5CI2n3edh6C8FGMwi0x9+oM9qay4AJ7bv0G
+nBA+vi3N1/zcpXOl2oQf8/janpdpcZIv7putwKrwmQU5kXiN4JoOIT+Od4XL10cP
+KxLMJSqBae5U4nHmYc09rLm059uLuClnb6nb493vk0WrQ8JYx0PhFWo/qsUbidKz
+ZJiMHMLD1ygCK6XDZ9dXjm3s4J6L8w24pc4AnCDpEHedb6COWJCCGxTVRXH5w2TW
+gPF8dXZKOWMhlYrrms6kFFCEbwyN1MadXixmDLf7mQKBgQC1b59GlMiVB7HZ64gU
+rVwpcja2kVCITyGe3T9nsfCiHK+eR3OH8EzGY3jHPoAsZVFQXFuxgnoIQ9URsIYO
+hBX6+6KKvYzQqW16bZSYrH5Xvkrka2Sgew2sexoolauHbqws2nD8qqhL5Iafcywt
+YAJN9j86ENddxb3e5HbXRxwc1wKBgQDDca+UT6J6AoeO9KKxVKnKEXO3j9vwXb7m
+kM7uT+NIfGIlS84FE3gmz19/ck2Wx/6BvfeZdJWLoCuUhAiZ7TIMamztMg209NOP
+9dejwq6sln9Xq7dmtCJFWs0q1KlU2voXn9g4SNdmt9ZZoAhDIDJ//YLQXK9gbIcZ
+G0FIFRFDEwKBgFG2r/lN0Pitun/3ABav7S4Nbm7TC34YhX/TDVdaYJHicYXLkFDM
+/QbsjdzOXVOhXfnfXryXkvZd9nw9EGNBQdUDt7TcoaroY5IcACyyo0RJQ9mUNIHC
+aeoglytoHRSl141r5C8nfRyRE8CEUoUtFFygZuFz7EsjX9we8RcLw7GJAoGAMpM1
+qzxcQBRtfcnCW/8bnMWdWy/n6zBnGQT592NG3lgJkbsnCds66VgaT0GF4HHKY0qE
+SvqHREVjYaA1FZfyrpHPtCWStHPyVzt33NGhH9omA8rUv8f3YlkX5HB5jRLJStm7
+Ov1meBc0aabm3G/1gXdZpY+Gdn4/C3kpEH0bJW8CgYBS0ZOmLm70axiQFsxj6zgr
+zYzI637Q0AehUif+7PUr5fc4jIAsM2mAGg1lMICvthf69GkIRs4JUywmRK+VmiOc
+JuVHFTH3XmyklNuhT7OJ8srV+K81V64gBo0I+Mp2S5dvQtePBKManUN2r46CUFM+
+ScBnD0bENAWY9GX7TTmCqQ==
+-----END PRIVATE KEY-----
diff --git a/docker/test/integration/resources/elasticsearch/certs/root_ca.crt b/docker/test/integration/resources/elasticsearch/certs/root_ca.crt
new file mode 100644
index 000000000..a2cfdb6e3
--- /dev/null
+++ b/docker/test/integration/resources/elasticsearch/certs/root_ca.crt
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIC/zCCAeegAwIBAgIEA9yGAzANBgkqhkiG9w0BAQsFADASMRAwDgYDVQQDDAdy
+b290IENBMB4XDTIyMDYwODE1MjAxNVoXDTMyMDYwNTE1MjAxNVowEjEQMA4GA1UE
+AwwHcm9vdCBDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANncdhc0
+ZKY8PCUo3KZDHS0N+KE0IHzzB90LnzSa5fHzDjAQJxbJwcw7ukr+OlfH2e5rMSVm
+koMpxIQKcNIF6QrT+nD5WZZdIQGdnKBWXYMcT9ao1wna+YhS1HdU2jI3s0WeRuMf
+0OeoWXq+reZkxK06g6CJDqsFfFW/lzuIJPJTRmv3o2kjmhr8nL9xEp17cEUNFsdx
+UfYGUK74OgYrttQkXglaiMjDvEVrC4E0yWsHXukA6ypmb1j0D4Q4bNXnQ2j/YjPs
+vkZxQ//RkXPWfuPum9QpLcqsedYUBwwXuFLqx85ybE6oLI4H4X0TXNcQ54CeYeQn
+MvfrGi+3j5IbJ2cCAwEAAaNdMFswHQYDVR0OBBYEFNo5o+5ea0sNMlW/75VgGJCv
+2AcJMB8GA1UdIwQYMBaAFNo5o+5ea0sNMlW/75VgGJCv2AcJMAwGA1UdEwQFMAMB
+Af8wCwYDVR0PBAQDAgEGMA0GCSqGSIb3DQEBCwUAA4IBAQDFrcL4kBz/hAP6InxR
+AShK6PCLo/UX9VAICbYf5/ZTze04Do9Ly0JjbmP+uYH1jSInppADmytuXYl4FWk6
+z6WUU7qn1hgdKL/TWqODRU23/gVojVy+JEeBrU1qMlKeqOMfI5EaEkQ1ymGERV0N
+8U+w5OYZQhhlx4kbO50RVxrtk+s+MLL56+Io2xzVQ2P9r+4jwBFQ3NB8GayWS30K
+R4T033AkpU0rruE+mDcHCRfjeOPeCyugujSFDXPzprK5THurXQiSS98Ny/Y7Btcs
+s3PoIvjS2197RLZTFhK6oio/lAourAmyIh/1DxmR/736t03XGrhX9Nm8//b6kSDE
+9FFY
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/elasticsearch/elasticsearch.yml b/docker/test/integration/resources/elasticsearch/elasticsearch.yml
new file mode 100644
index 000000000..91e2cbd3c
--- /dev/null
+++ b/docker/test/integration/resources/elasticsearch/elasticsearch.yml
@@ -0,0 +1,28 @@
+cluster.name: "docker-cluster"
+network.host: 0.0.0.0
+discovery.type: "single-node"
+
+# Enable security features
+xpack.security.enabled: true
+
+xpack.security.enrollment.enabled: true
+
+# Enable encryption for HTTP API client connections, such as Kibana, Logstash, and Agents
+xpack.security.http.ssl:
+  enabled: true
+  key: certs/elastic_http.key
+  certificate: certs/elastic_http.crt
+  certificate_authorities: ["certs/root_ca.crt"]
+
+
+# Enable encryption and mutual authentication between cluster nodes
+xpack.security.transport.ssl:
+  enabled: true
+  verification_mode: certificate
+  key: certs/elastic_transport.key
+  certificate: certs/elastic_transport.crt
+  certificate_authorities: ["certs/root_ca.crt"]
+
+# Create a new cluster with the current node only
+# Additional nodes can still join the cluster later
+
diff --git a/docker/test/integration/resources/opensearch/Dockerfile b/docker/test/integration/resources/opensearch/Dockerfile
new file mode 100644
index 000000000..2cb10a893
--- /dev/null
+++ b/docker/test/integration/resources/opensearch/Dockerfile
@@ -0,0 +1,5 @@
+FROM opensearchproject/opensearch:latest
+COPY opensearch.yml /usr/share/opensearch/config/opensearch.yml
+COPY certs/admin.pem /usr/share/opensearch/config/admin.pem
+COPY certs/admin-key.pem /usr/share/opensearch/config/admin-key.pem
+COPY certs/root-ca.pem /usr/share/opensearch/config/root-ca.pem
diff --git a/docker/test/integration/resources/opensearch/certs/admin-key.pem b/docker/test/integration/resources/opensearch/certs/admin-key.pem
new file mode 100644
index 000000000..37ae977a4
--- /dev/null
+++ b/docker/test/integration/resources/opensearch/certs/admin-key.pem
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0Gz3ZI/Vwk8nR
+D8w49lH/yC1LAXTh56wx2zqfgKbC/GOBSoZqYKQG9Ss6W0Y/Tt+witKDiuE6HP4s
+d73nbriqhsv41CWIsDpaHDFPsj20r+NzVlAFBgUY5oYl0xb8uqKyVvPAekn/s9T5
+1HXknLoFGzS67tK/HXRhglWCf/3/8O2R3rHB6Qo2UORnNUpdf8PaJuuTfhwzZWYv
+roJB5CJPu27GlMOQ/g2u5R9F2zKBrXw9ud2CZdX+BukQcoDvPAbPq9OVQ7ODc3gW
+YdRj7zzWPrnU0D/rwjH81ZFG801/BIpetpakvvPoz4gZrsLD4fd7vP2otVf6kuuw
+OGoUHPifAgMBAAECggEATm2wbI6Ski/hCddZxJJK2dnXE1ryhW8HlnxTVmLE+5LB
+V+tWhhtIQjlf1h0qBWZ41R1Wl26SLmayrjWEbjPSWXuydcwX/iJTfqrZNE23Pif7
+oV18Ifvhm1rDHtSBpZM2RvfDVFVMRPuP6izBG8i93nxe5WobPpr6c324c8f9mQYa
+3mD1jJm97wFOV8Ccs253GImkW5DXpPZWHOaooa8vI+UOb4cd0miHBUQ5Ui0tjMnW
+A3ivLu8Rh5viPWZ44QtmTTpDy74UxxwMlxr6IXBzTSxTzWAu9hAo9BFN73ObKpsh
+8R8emUDPB8Qzmu+Jqszn7vudCuIHLRwUcUXbWDNBkQKBgQDsxPAmsrZhHif3PvqL
+mOFUqfgbkzJulgBsgruRRrmE5TqpSHnj76LgHaXxDH8JHOvRY/aABg0VWGnLurVN
+sksqvfNZIyD47upMCG3gCS8CikSWbB+Sdy88LsKWK//mf6Vszzb/h8LGRYyuWGpE
+bY/Hkaf+cVQd4HXvO5oXT94KMQKBgQDCvCHE2s76Mo2NcpXDXW1W5Fvjr0he6AX5
+aKjsG24QgBIHmCHHD3EGoQTZo59llt7OCLZNj/QefjZY5l/5MNr1vuR0hn2OzHxB
+6xbQy/y7Aa8Aw4Bh+grqduTC3QqoKSFZivxbU7o2XFgTWQfrnYSZLqs2yHqXDk95
+qQ1H5gSrzwKBgC/ra+9pHCBrygrwZCT8vB42iFCMahiXo15S10N/3OxYuJaqZIEN
+bxmM5pjyvoNJpzCp2qnuj5dSXZcuIqOnAi+UYjwYMAQx1pms2xAMy8bn1RncqEcs
+eBLmI9vg69nq28A58BenknQuf1qJ6ngO9JqfkDCfHzcsxrG0jmtJbtcBAoGAFnat
+TpAPID2EprR5Ijg8zLE7CGDH8GrDhanVEqd3Yt/jNI0QGG5nk2Qvswt077sfqJTI
+stnQR1Q34LlyiMRh/ccvg2GgSkC8dL4xu5RMayYeke/7d1HowHNGoFPvTsav2ix2
+P/LAoFS2tESo7T0WdrUzevZVbedws5AUwB6Am3ECgYEA6+Ka9zgFdwRQWnFZnzo8
+AHVgEfrGySnPhk1TIZaNCCQfW0PbWPat4de7pdmQ8b25Xkmtv5Dvx1DZt7nEXGvi
+CFlrJan7DOXsLCFU4wrOKrAzjX5dyxmOlMQ6yDcUfXNo56nlmu0OzMdXSkeE1NF4
+E178foT4/NXi3o1QPsF4ZOU=
+-----END PRIVATE KEY-----
diff --git a/docker/test/integration/resources/opensearch/certs/admin.pem b/docker/test/integration/resources/opensearch/certs/admin.pem
new file mode 100644
index 000000000..615cca17e
--- /dev/null
+++ b/docker/test/integration/resources/opensearch/certs/admin.pem
@@ -0,0 +1,19 @@
+-----BEGIN CERTIFICATE-----
+MIIDJjCCAg4CFBL2toHOSVHiwhbu9ZZF9GNSmP3BMA0GCSqGSIb3DQEBCwUAMEUx
+CzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl
+cm5ldCBXaWRnaXRzIFB0eSBMdGQwHhcNMjIwNjE0MTQzOTM4WhcNMzIwNDIyMTQz
+OTM4WjBaMQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UE
+CgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMRMwEQYDVQQDDApvcGVuc2VhcmNo
+MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtBs92SP1cJPJ0Q/MOPZR
+/8gtSwF04eesMds6n4CmwvxjgUqGamCkBvUrOltGP07fsIrSg4rhOhz+LHe95264
+qobL+NQliLA6WhwxT7I9tK/jc1ZQBQYFGOaGJdMW/LqislbzwHpJ/7PU+dR15Jy6
+BRs0uu7Svx10YYJVgn/9//Dtkd6xwekKNlDkZzVKXX/D2ibrk34cM2VmL66CQeQi
+T7tuxpTDkP4NruUfRdsyga18PbndgmXV/gbpEHKA7zwGz6vTlUOzg3N4FmHUY+88
+1j651NA/68Ix/NWRRvNNfwSKXraWpL7z6M+IGa7Cw+H3e7z9qLVX+pLrsDhqFBz4
+nwIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQCrKip0y1U4Op0GlOCFn742PXz7jvmX
+RiZ5iJbi0JO9MfVhKvdJH7aMzwzJy5+vqv9tvgbWUBgQrIzINi9+rHJi16oFYsxO
+/41+pFB1ePwlbq8YeEVj79jve+/N/2gJU6Ax5kRgEmJ56ECoM+fhvn9qihPMfViq
+QnvCkhXXDgFrXs5zYgCxRqMKSZOIO3ARwGP9I/SqAV4KSoL+nn9n2WIV19kSOrhV
+BIokJ6ghG1y/eFHs6/YuQJq8Xv0dsSYS8AhPB2ScmpODnzeENbjs9XxvbhWn6mUV
+03TqUwVUi8waNAIosH5ZWOAMcJY9BZt+nHrwdFvPGOwlDlTolluBK4NW
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/opensearch/certs/root-ca.pem b/docker/test/integration/resources/opensearch/certs/root-ca.pem
new file mode 100644
index 000000000..d46147394
--- /dev/null
+++ b/docker/test/integration/resources/opensearch/certs/root-ca.pem
@@ -0,0 +1,21 @@
+-----BEGIN CERTIFICATE-----
+MIIDazCCAlOgAwIBAgIUOIX81x7AWgLdLCJp8yU1D/oYRXgwDQYJKoZIhvcNAQEL
+BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMjA2MTQxNDM4MTRaFw0zMjA0
+MjIxNDM4MTRaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
+HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
+AQUAA4IBDwAwggEKAoIBAQC4cE+oIKlQZ6APRyTq/hUhpA2epXpMznwRr8u2hdQF
+fuc2MFhc1IQP4UT6HPpBuVOPNwM+eUIRfL3SFMwb4pemTEm9PoUe9yHFrWMqvojf
+H0IFtXymSjAoaBcNqELELgcb3JMKtDVZsZ5PUEkacvUXj2q4hnkIxbGz1lwQhCNw
+gEvoiITyQHsfSgrjYO9gUmuM434UKLR4V3Al+Z/C6UjGMSYpoVbOKhdjMJ2Q8vtp
+NAurC/K/bsc67FyWZNURAirOCom1Bs/Qs/z1rtT5gQ/4HxPKrG3ognJAg2n+IV6g
+R0GtBeImWUuR8sSyHKkgL6lblPb0hwveyTwYq3mKInh5AgMBAAGjUzBRMB0GA1Ud
+DgQWBBRAcV/RuVoJUrbakhKEi4RG7Scz1TAfBgNVHSMEGDAWgBRAcV/RuVoJUrba
+khKEi4RG7Scz1TAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQB/
+J4ecHjsRkfxaqMOULbXzyrHvgg/pB/5N4u2g30E4W7iNj+7t9sEiMbeB3fML1dk9
+aeVJR8kh6OsFSG6MMIUATwaE3sGQfW54wgnfc9rLAIN/uo735secF8SjE6UgqZjE
+rPLnT/J9M3qmhUexo3YJZy4nzbR18wgIBaqynC3/WCbljt+FgIRXd+LUs8525XiI
+u7zbeBUErDD+tOnEUfHAjfJ49+GWWcM9Hg7s27iLTykHw0pWgMTzJY13t2ycBz6E
+dMx6KTGIAOG6kJeWM6eGiT/q7A++f2ZNZ7rJLg9is7rGRTUf4oJk2rlaiYxH7Jwo
+IfO0VUGkZSaBJjqB/Svg
+-----END CERTIFICATE-----
diff --git a/docker/test/integration/resources/opensearch/opensearch.yml b/docker/test/integration/resources/opensearch/opensearch.yml
new file mode 100644
index 000000000..6baea9cb8
--- /dev/null
+++ b/docker/test/integration/resources/opensearch/opensearch.yml
@@ -0,0 +1,16 @@
+cluster.name: "docker-cluster"
+network.host: 0.0.0.0
+discovery.type: single-node
+
+plugins.security.ssl.http.enabled: true
+plugins.security.ssl.http.pemcert_filepath: admin.pem
+plugins.security.ssl.http.pemkey_filepath: admin-key.pem
+plugins.security.ssl.http.pemtrustedcas_filepath: root-ca.pem
+plugins.security.ssl.transport.pemcert_filepath: admin.pem
+plugins.security.ssl.transport.pemkey_filepath: admin-key.pem
+plugins.security.ssl.transport.pemtrustedcas_filepath: root-ca.pem
+plugins.security.ssl.transport.enforce_hostname_verification: false
+plugins.security.allow_unsafe_democertificates: true
+plugins.security.allow_default_init_securityindex: true
+
+plugins.security.restapi.roles_enabled: ["all_access", "security_rest_api_access"]
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
index 46e79b1de..3b4f43ace 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -21,6 +21,7 @@ from minifi.core.Funnel import Funnel
 
 from minifi.controllers.SSLContextService import SSLContextService
 from minifi.controllers.GCPCredentialsControllerService import GCPCredentialsControllerService
+from minifi.controllers.ElasticsearchCredentialsService import ElasticsearchCredentialsService
 from minifi.controllers.ODBCService import ODBCService
 from minifi.controllers.KubernetesControllerService import KubernetesControllerService
 
@@ -410,6 +411,62 @@ def step_impl(context):
     context.test.acquire_container("fake-gcs-server", "fake-gcs-server")
 
 
+# elasticsearch
+@given('an Elasticsearch server is set up and running')
+@given('an Elasticsearch server is set up and a single document is present with "preloaded_id" in "my_index"')
+@given('an Elasticsearch server is set up and a single document is present with "preloaded_id" in "my_index" with "value1" in "field1"')
+def step_impl(context):
+    context.test.start_elasticsearch()
+    context.test.create_doc_elasticsearch("elasticsearch", "my_index", "preloaded_id")
+
+
+# opensearch
+@given('an Opensearch server is set up and running')
+@given('an Opensearch server is set up and a single document is present with "preloaded_id" in "my_index"')
+@given('an Opensearch server is set up and a single document is present with "preloaded_id" in "my_index" with "value1" in "field1"')
+def step_impl(context):
+    context.test.start_opensearch()
+    context.test.add_elastic_user_to_opensearch("opensearch")
+    context.test.create_doc_elasticsearch("opensearch", "my_index", "preloaded_id")
+
+
+@given(u'a SSL context service is set up for PostElasticsearch and Elasticsearch')
+def step_impl(context):
+    minifi_crt_file = '/tmp/resources/elasticsearch/minifi_client.crt'
+    minifi_key_file = '/tmp/resources/elasticsearch/minifi_client.key'
+    root_ca_crt_file = '/tmp/resources/elasticsearch/root_ca.crt'
+    ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)
+    post_elasticsearch_json = context.test.get_node_by_name("PostElasticsearch")
+    post_elasticsearch_json.controller_services.append(ssl_context_service)
+    post_elasticsearch_json.set_property("SSL Context Service", ssl_context_service.name)
+
+
+@given(u'a SSL context service is set up for PostElasticsearch and Opensearch')
+def step_impl(context):
+    root_ca_crt_file = '/tmp/resources/opensearch/root-ca.pem'
+    ssl_context_service = SSLContextService(ca_cert=root_ca_crt_file)
+    post_elasticsearch_json = context.test.get_node_by_name("PostElasticsearch")
+    post_elasticsearch_json.controller_services.append(ssl_context_service)
+    post_elasticsearch_json.set_property("SSL Context Service", ssl_context_service.name)
+
+
+@given(u'an ElasticsearchCredentialsService is set up for PostElasticsearch with Basic Authentication')
+def step_impl(context):
+    elasticsearch_credential_service = ElasticsearchCredentialsService()
+    post_elasticsearch_json = context.test.get_node_by_name("PostElasticsearch")
+    post_elasticsearch_json.controller_services.append(elasticsearch_credential_service)
+    post_elasticsearch_json.set_property("Elasticsearch Credentials Provider Service", elasticsearch_credential_service.name)
+
+
+@given(u'an ElasticsearchCredentialsService is set up for PostElasticsearch with ApiKey')
+def step_impl(context):
+    api_key = context.test.elastic_generate_apikey("elasticsearch")
+    elasticsearch_credential_service = ElasticsearchCredentialsService(api_key)
+    post_elasticsearch_json = context.test.get_node_by_name("PostElasticsearch")
+    post_elasticsearch_json.controller_services.append(elasticsearch_credential_service)
+    post_elasticsearch_json.set_property("Elasticsearch Credentials Provider Service", elasticsearch_credential_service.name)
+
+
 # splunk hec
 @given("a Splunk HEC is set up and running")
 def step_impl(context):
@@ -836,3 +893,23 @@ def step_impl(context, metric_class, timeout_seconds):
 @then("\"{metric_class}\" processor metric is published to the Prometheus server in less than {timeout_seconds:d} seconds for \"{processor_name}\" processor")
 def step_impl(context, metric_class, timeout_seconds, processor_name):
     context.test.check_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)
+
+
+@then("Elasticsearch is empty")
+def step_impl(context):
+    context.test.check_empty_elastic("elasticsearch")
+
+
+@then(u'Elasticsearch has a document with "{doc_id}" in "{index}" that has "{value}" set in "{field}"')
+def step_impl(context, doc_id, index, value, field):
+    context.test.check_elastic_field_value("elasticsearch", index_name=index, doc_id=doc_id, field_name=field, field_value=value)
+
+
+@then("Opensearch is empty")
+def step_impl(context):
+    context.test.check_empty_elastic("opensearch")
+
+
+@then(u'Opensearch has a document with "{doc_id}" in "{index}" that has "{value}" set in "{field}"')
+def step_impl(context, doc_id, index, value, field):
+    context.test.check_elastic_field_value("opensearch", index_name=index, doc_id=doc_id, field_name=field, field_value=value)
diff --git a/extensions/elasticsearch/CMakeLists.txt b/extensions/elasticsearch/CMakeLists.txt
new file mode 100644
index 000000000..ad8c6f5b7
--- /dev/null
+++ b/extensions/elasticsearch/CMakeLists.txt
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if (NOT (ENABLE_ALL OR ENABLE_ELASTICSEARCH))
+  return()
+endif()
+
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+file(GLOB SOURCES  "*.cpp")
+
+add_library(minifi-elasticsearch SHARED ${SOURCES})
+
+target_include_directories(minifi-elasticsearch PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/http-curl")
+target_link_libraries(minifi-elasticsearch ${LIBMINIFI})
+target_link_libraries(minifi-elasticsearch minifi-http-curl)
+
+register_extension(minifi-elasticsearch "ELASTICSEARCH EXTENSIONS" ELASTICSEARCH-EXTENSIONS "This enables elasticsearch support" "extensions/elasticsearch/tests")
+
+register_extension_linter(minifi-elasticsearch-extensions-linter)
diff --git a/extensions/elasticsearch/ElasticsearchCredentialsControllerService.cpp b/extensions/elasticsearch/ElasticsearchCredentialsControllerService.cpp
new file mode 100644
index 000000000..6e3b652ea
--- /dev/null
+++ b/extensions/elasticsearch/ElasticsearchCredentialsControllerService.cpp
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/Resource.h"
+#include "core/PropertyBuilder.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+const core::Property ElasticsearchCredentialsControllerService::Username = core::PropertyBuilder::createProperty("Username")
+    ->withDescription("The username for basic authentication")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ElasticsearchCredentialsControllerService::Password = core::PropertyBuilder::createProperty("Password")
+    ->withDescription("The password for basic authentication")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ElasticsearchCredentialsControllerService::ApiKey = core::PropertyBuilder::createProperty("API Key")
+    ->withDescription("The API Key to use")
+    ->build();
+
+
+void ElasticsearchCredentialsControllerService::initialize() {
+  setSupportedProperties(properties());
+}
+
+void ElasticsearchCredentialsControllerService::onEnable() {
+  getProperty(ApiKey.getName(), api_key_);
+  std::string username, password;
+  getProperty(Username.getName(), username);
+  getProperty(Password.getName(), password);
+  if (!username.empty() && !password.empty())
+    username_password_.emplace(std::move(username), std::move(password));
+  if (api_key_.has_value() == username_password_.has_value())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Either an API Key or Username and Password must be provided");
+}
+
+void ElasticsearchCredentialsControllerService::authenticateClient(utils::HTTPClient& client) {
+  gsl_Expects(api_key_.has_value() != username_password_.has_value());
+  if (api_key_) {
+    client.appendHeader("Authorization", "ApiKey " + *api_key_);
+  } else if (username_password_) {
+    client.setBasicAuth(username_password_->first, username_password_->second);
+  }
+}
+
+REGISTER_RESOURCE(ElasticsearchCredentialsControllerService, ControllerService);
+}  // namespace org::apache::nifi::minifi::extensions::elasticsearch
diff --git a/extensions/elasticsearch/ElasticsearchCredentialsControllerService.h b/extensions/elasticsearch/ElasticsearchCredentialsControllerService.h
new file mode 100644
index 000000000..c17357f55
--- /dev/null
+++ b/extensions/elasticsearch/ElasticsearchCredentialsControllerService.h
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#pragma once
+
+#include <string>
+#include <utility>
+#include <memory>
+
+#include "core/controller/ControllerService.h"
+#include "client/HTTPClient.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+class ElasticsearchCredentialsControllerService : public core::controller::ControllerService {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "Elasticsearch/Opensearch Credentials Controller Service";
+
+  EXTENSIONAPI static const core::Property Username;
+  EXTENSIONAPI static const core::Property Password;
+  EXTENSIONAPI static const core::Property ApiKey;
+
+  static auto properties() {
+    return std::array{
+        Username,
+        Password,
+        ApiKey
+    };
+  }
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
+
+  using ControllerService::ControllerService;
+
+  void initialize() override;
+
+  void yield() override {}
+
+  bool isWorkAvailable() override {
+    return false;
+  }
+
+  bool isRunning() override {
+    return getState() == core::controller::ControllerServiceState::ENABLED;
+  }
+
+  void onEnable() override;
+
+  void authenticateClient(utils::HTTPClient& client);
+
+ private:
+  std::optional<std::pair<std::string, std::string>> username_password_;
+  std::optional<std::string> api_key_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ElasticsearchCredentialsControllerService>::getLogger();
+};
+}  //  namespace org::apache::nifi::minifi::extensions::elasticsearch
diff --git a/extensions/elasticsearch/PostElasticsearch.cpp b/extensions/elasticsearch/PostElasticsearch.cpp
new file mode 100644
index 000000000..eeab912b1
--- /dev/null
+++ b/extensions/elasticsearch/PostElasticsearch.cpp
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles that fail for reasons unrelated to server availability go to this relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->isRequired(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of flow files to process at a time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->isRequired(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->isRequired(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = context.getProperty(PostElasticsearch::ElasticCredentials))
+    return std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is invalid");
+
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet supported");
+    host_url_ = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  credentials_service_ = getCredentialsService(*context);
+  if (!credentials_service_)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch credentials service");
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static nonstd::expected<ElasticPayload, std::string> parse(core::ProcessSession& session, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != "delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == "upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, &payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), operation_.size());
+    first_line.AddMember(operation_index_key, rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size());
+    operation_request.AddMember("_index", index_json, first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+nonstd::expected<rapidjson::Document, std::string> submitRequest(utils::HTTPClient& client, std::string&& payload, const size_t expected_items) {
+  client.setPostFields(std::move(payload));
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  } else if (object->value.IsDouble()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetDouble()));
+  } else {
+    core::logging::LoggerFactory<PostElasticsearch>::getLogger()->log_error("Unexpected %s in response json", object->value.GetType());
+  }
+}
+
+void processResponseFromElastic(const rapidjson::Document& response, core::ProcessSession& session, const std::vector<std::shared_ptr<core::FlowFile>>& flowfiles_sent) {
+  gsl_Expects(response.HasMember("items"));
+  auto& items = response["items"];
+  gsl_Expects(items.IsArray());
+  gsl_Expects(items.Size() == flowfiles_sent.size());
+  for (size_t i = 0; i < items.Size(); ++i) {
+    gsl_Expects(items[i].IsObject());
+    for (auto it = items[i].MemberBegin(); it != items[i].MemberEnd(); ++it) {
+      addAttributesFromResponse("elasticsearch", it, *flowfiles_sent[i]);
+    }
+    if (items[i].MemberBegin()->value.HasMember("error"))
+      session.transfer(flowfiles_sent[i], PostElasticsearch::Error);
+    else
+      session.transfer(flowfiles_sent[i], PostElasticsearch::Success);
+  }
+}
+}  // namespace
+
+std::string PostElasticsearch::collectPayload(core::ProcessContext& context,
+                                              core::ProcessSession& session,
+                                              std::vector<std::shared_ptr<core::FlowFile>>& flowfiles_with_payload) const {
+  std::stringstream payload;
+  for (size_t flow_files_processed = 0; flow_files_processed < max_batch_size_; ++flow_files_processed) {
+    auto flow_file = session.get();
+    if (!flow_file)
+      break;
+    auto elastic_payload = ElasticPayload::parse(session, context, flow_file);
+    if (!elastic_payload) {
+      logger_->log_error(elastic_payload.error().c_str());
+      session.transfer(flow_file, PostElasticsearch::Failure);
+      continue;
+    }
+
+    payload << elastic_payload->toString() << "\n";
+    flowfiles_with_payload.push_back(flow_file);
+  }
+  return payload.str();
+}
+
+void PostElasticsearch::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && max_batch_size_ > 0);
+  utils::HTTPClient client;
+  client.initialize("POST", host_url_ + "/_bulk", getSSLContextService(*context));
+  client.setContentType("application/json");
+  credentials_service_->authenticateClient(client);
+
+  std::vector<std::shared_ptr<core::FlowFile>> flowfiles_with_payload;
+  auto payload = collectPayload(*context, *session, flowfiles_with_payload);
+
+  if (flowfiles_with_payload.empty()) {
+    return;
+  }
+
+  auto result = submitRequest(client, std::move(payload), flowfiles_with_payload.size());
+  if (!result) {
+    logger_->log_error(result.error().c_str());
+    for (const auto& flow_file_in_payload: flowfiles_with_payload)
+      session->transfer(flow_file_in_payload, Failure);
+    return;
+  }
+
+  processResponseFromElastic(*result, *session, flowfiles_with_payload);
+}
+
+REGISTER_RESOURCE(PostElasticsearch, Processor);
+
+}  // namespace org::apache::nifi::minifi::extensions::elasticsearch
diff --git a/extensions/elasticsearch/PostElasticsearch.h b/extensions/elasticsearch/PostElasticsearch.h
new file mode 100644
index 000000000..5badaf431
--- /dev/null
+++ b/extensions/elasticsearch/PostElasticsearch.h
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#pragma once
+
+#include <string>
+#include <memory>
+#include <vector>
+
+#include "controllers/SSLContextService.h"
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/Processor.h"
+#include "utils/Enum.h"
+#include "client/HTTPClient.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+class PostElasticsearch : public core::Processor {
+ public:
+  EXTENSIONAPI static constexpr const char* Description = "An Elasticsearch/Opensearch post processor that uses the Elasticsearch/Opensearch _bulk REST API.";
+
+  explicit PostElasticsearch(const std::string& name, const utils::Identifier& uuid = {})
+      : Processor(name, uuid) {
+  }
+  ~PostElasticsearch() override = default;
+
+  EXTENSIONAPI static const core::Property Action;
+  EXTENSIONAPI static const core::Property MaxBatchSize;
+  EXTENSIONAPI static const core::Property ElasticCredentials;
+  EXTENSIONAPI static const core::Property SSLContext;
+  EXTENSIONAPI static const core::Property Hosts;
+  EXTENSIONAPI static const core::Property Index;
+  EXTENSIONAPI static const core::Property Identifier;
+
+  static auto properties() {
+    return std::array{
+      Action,
+      MaxBatchSize,
+      ElasticCredentials,
+      SSLContext,
+      Hosts,
+      Index,
+      Identifier
+    };
+  }
+
+  EXTENSIONAPI static const core::Relationship Success;
+  EXTENSIONAPI static const core::Relationship Failure;
+  EXTENSIONAPI static const core::Relationship Error;
+
+  static auto relationships() {
+    return std::array{Success, Failure, Error};
+  }
+
+  EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+  EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+  EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
+  EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
+
+  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
+
+  void initialize() override;
+  void onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) override;
+  void onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override;
+
+ private:
+  std::string collectPayload(core::ProcessContext&, core::ProcessSession&, std::vector<std::shared_ptr<core::FlowFile>>&) const;
+
+  uint64_t max_batch_size_ = 100;
+  std::string host_url_;
+  std::shared_ptr<ElasticsearchCredentialsControllerService> credentials_service_;
+  std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<PostElasticsearch>::getLogger();
+};
+
+}  // namespace org::apache::nifi::minifi::extensions::elasticsearch
diff --git a/extensions/elasticsearch/tests/CMakeLists.txt b/extensions/elasticsearch/tests/CMakeLists.txt
new file mode 100644
index 000000000..e53b6d4d9
--- /dev/null
+++ b/extensions/elasticsearch/tests/CMakeLists.txt
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB ELASTICSEARCH_TESTS  "*.cpp")
+
+SET(ELASTICSEARCH_TEST_COUNT 0)
+FOREACH(testfile ${ELASTICSEARCH_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${testfile}")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/elasticsearch")
+    target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/libminifi/test/")
+    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/civetweb/")
+    target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/http-curl/")
+
+    createTests("${testfilename}")
+    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+    target_link_libraries(${testfilename} minifi-elasticsearch)
+    target_link_libraries(${testfilename} minifi-civet-extensions)
+    target_link_libraries(${testfilename} minifi-http-curl)
+    target_link_libraries(${testfilename} minifi-standard-processors)
+    MATH(EXPR ELASTICSEARCH_TEST_COUNT "${ELASTICSEARCH_TEST_COUNT}+1")
+    add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${ELASTICSEARCH_TEST_COUNT} Elasticsearch related test file(s)...")
diff --git a/extensions/elasticsearch/tests/MockElastic.h b/extensions/elasticsearch/tests/MockElastic.h
new file mode 100644
index 000000000..aced8c244
--- /dev/null
+++ b/extensions/elasticsearch/tests/MockElastic.h
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+#include <CivetServer.h>
+#include "tests/CivetLibrary.h"
+#include "core/logging/Logger.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "rapidjson/document.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/stringbuffer.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch::test {
+
+class MockElasticAuthHandler : public CivetAuthHandler {
+ public:
+  static constexpr const char* API_KEY = "VnVhQ2ZHY0JDZGJrUW0tZTVhT3g6dWkybHAyYXhUTm1zeWFrdzl0dk5udw";
+  static constexpr const char* USERNAME = "elastic";
+  static constexpr const char* PASSWORD = "elastic_password";
+
+ private:
+  bool authorize(CivetServer*, struct mg_connection* conn) override {
+    const char* authHeader = mg_get_header(conn, "Authorization");
+    if (authHeader == nullptr) {
+      return false;
+    }
+    if (strcmp(authHeader, "Basic ZWxhc3RpYzplbGFzdGljX3Bhc3N3b3Jk") == 0)
+      return true;
+    if (strcmp(authHeader, "ApiKey VnVhQ2ZHY0JDZGJrUW0tZTVhT3g6dWkybHAyYXhUTm1zeWFrdzl0dk5udw") == 0)
+      return true;
+    return false;
+  };
+};
+
+class BulkElasticHandler : public CivetHandler {
+ public:
+  void returnErrors(bool ret_errors) {
+    ret_error_ = ret_errors;
+  }
+
+ private:
+  rapidjson::Value addIndexSuccess(rapidjson::Document::AllocatorType& alloc) {
+    rapidjson::Value item{rapidjson::kObjectType};
+    rapidjson::Value operation{rapidjson::kObjectType};
+    operation.AddMember("_index", "test", alloc);
+    operation.AddMember("_id", "1", alloc);
+    operation.AddMember("result", "created", alloc);
+    item.AddMember("index", operation, alloc);
+    return item;
+  }
+
+  rapidjson::Value addUpdateSuccess(rapidjson::Document::AllocatorType& alloc) {
+    rapidjson::Value item{rapidjson::kObjectType};
+    rapidjson::Value operation{rapidjson::kObjectType};
+    operation.AddMember("_index", "test", alloc);
+    operation.AddMember("_id", "1", alloc);
+    operation.AddMember("result", "updated", alloc);
+    item.AddMember("update", operation, alloc);
+    return item;
+  }
+
+  rapidjson::Value addUpdateError(rapidjson::Document::AllocatorType& alloc) {
+    rapidjson::Value item{rapidjson::kObjectType};
+    rapidjson::Value operation{rapidjson::kObjectType};
+    operation.AddMember("_index", "test", alloc);
+    operation.AddMember("_id", "1", alloc);
+    rapidjson::Value error{rapidjson::kObjectType};
+    error.AddMember("type", "document_missing_exception", alloc);
+    error.AddMember("reason", "[6]: document missing", alloc);
+    error.AddMember("index_uuid", "aAsFqTI0Tc2W0LCWgPNrOA", alloc);
+    error.AddMember("shard", "0", alloc);
+    error.AddMember("index", "index", alloc);
+    operation.AddMember("error", error, alloc);
+    item.AddMember("update", operation, alloc);
+    return item;
+  }
+
+  bool handlePost(CivetServer*, struct mg_connection* conn) override {
+    char request[2048];
+    size_t chars_read = mg_read(conn, request, 2048);
+
+    std::vector<std::string> lines = utils::StringUtils::splitRemovingEmpty({request, chars_read}, "\n");
+    rapidjson::Document response{rapidjson::kObjectType};
+    response.AddMember("took", 30, response.GetAllocator());
+    response.AddMember("errors", ret_error_, response.GetAllocator());
+    response.AddMember("items", rapidjson::kArrayType, response.GetAllocator());
+    auto& items = response["items"];
+    for (const auto& line : lines) {
+      rapidjson::Document line_json;
+      line_json.Parse<rapidjson::kParseStopWhenDoneFlag>(line.data());
+      if (!line_json.HasMember("index") && !line_json.HasMember("create") && !line_json.HasMember("update") && !line_json.HasMember("delete"))
+        continue;
+
+
+      rapidjson::Value item{rapidjson::kObjectType};
+      rapidjson::Value operation{rapidjson::kObjectType};
+
+      if (ret_error_) {
+        items.PushBack(addUpdateError(response.GetAllocator()), response.GetAllocator());
+      } else {
+        if (line_json.HasMember("update"))
+          items.PushBack(addUpdateSuccess(response.GetAllocator()), response.GetAllocator());
+        else
+          items.PushBack(addIndexSuccess(response.GetAllocator()), response.GetAllocator());
+      }
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    response.Accept(writer);
+
+    mg_printf(conn, "HTTP/1.1 200 OK\r\n");
+    mg_printf(conn, "Content-length: %lu", buffer.GetSize());
+    mg_printf(conn, "\r\n\r\n");
+    mg_printf(conn, "%s", buffer.GetString());
+    return true;
+  }
+
+  bool ret_error_ = false;
+};
+
+class MockElastic {
+ public:
+  explicit MockElastic(std::string port) : port_(std::move(port)) {
+    std::vector<std::string> options;
+    options.emplace_back("listening_ports");
+    options.emplace_back(port_);
+
+    server_ = std::make_unique<CivetServer>(options, &callbacks_, &logger_);
+    bulk_handler_ = std::make_unique<BulkElasticHandler>();
+    server_->addHandler("/_bulk", *bulk_handler_);
+
+    auth_handler_ = std::make_unique<MockElasticAuthHandler>();
+    server_->addAuthHandler("/_bulk", *auth_handler_);
+  }
+
+  [[nodiscard]] const std::string& getPort() const {
+    return port_;
+  }
+
+  void returnErrors(bool ret_errors) {
+    bulk_handler_->returnErrors(ret_errors);
+  }
+
+ private:
+  CivetLibrary lib_;
+  std::string port_;
+  std::unique_ptr<CivetServer> server_;
+  std::unique_ptr<BulkElasticHandler> bulk_handler_;
+  std::unique_ptr<MockElasticAuthHandler> auth_handler_;
+
+  CivetCallbacks callbacks_;
+  std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_ = org::apache::nifi::minifi::core::logging::LoggerFactory<MockElastic>::getLogger();
+};
+
+}  // namespace org::apache::nifi::minifi::extensions::elasticsearch::test
diff --git a/extensions/elasticsearch/tests/PostElasticsearchTests.cpp b/extensions/elasticsearch/tests/PostElasticsearchTests.cpp
new file mode 100644
index 000000000..0479da807
--- /dev/null
+++ b/extensions/elasticsearch/tests/PostElasticsearchTests.cpp
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PostElasticsearch.h"
+#include "../ElasticsearchCredentialsControllerService.h"
+#include "MockElastic.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch::test {
+
+TEST_CASE("PostElasticsearch", "[elastic]") {
+  MockElastic mock_elastic("10433");
+
+  auto post_elasticsearch_json = std::make_shared<PostElasticsearch>("PostElasticsearch");
+  minifi::test::SingleProcessorTestController test_controller{post_elasticsearch_json};
+  auto elasticsearch_credentials_controller_service = test_controller.plan->addController("ElasticsearchCredentialsControllerService", "elasticsearch_credentials_controller_service");
+  CHECK(test_controller.plan->setProperty(post_elasticsearch_json,
+                                     PostElasticsearch::ElasticCredentials.getName(),
+                                     "elasticsearch_credentials_controller_service"));
+  CHECK(test_controller.plan->setProperty(post_elasticsearch_json,
+                                    PostElasticsearch::Hosts.getName(),
+                                    "localhost:10433"));
+  CHECK(test_controller.plan->setProperty(post_elasticsearch_json,
+                                    PostElasticsearch::Action.getName(),
+                                    "${elastic_action}"));
+  CHECK(test_controller.plan->setProperty(post_elasticsearch_json,
+                                    PostElasticsearch::Index.getName(),
+                                    "test_index"));
+
+  SECTION("Index with valid basic authentication") {
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::Username.getName(),
+                                            MockElasticAuthHandler::USERNAME));
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::Password.getName(),
+                                            MockElasticAuthHandler::PASSWORD));
+
+    auto results = test_controller.trigger({{R"({"field1":"value1"}")", {{"elastic_action", "index"}}},
+                                            {R"({"field1":"value2"}")", {{"elastic_action", "index"}}}});
+    REQUIRE(results[PostElasticsearch::Success].size() == 2);
+    for (const auto& result : results[PostElasticsearch::Success]) {
+      auto attributes = result->getAttributes();
+      CHECK(attributes.contains("elasticsearch.index._id"));
+      CHECK(attributes.contains("elasticsearch.index._index"));
+    }
+  }
+
+  SECTION("Update with valid ApiKey") {
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::ApiKey.getName(),
+                                            MockElasticAuthHandler::API_KEY));
+    CHECK(test_controller.plan->setProperty(post_elasticsearch_json,
+                                            PostElasticsearch::Identifier.getName(),
+                                            "${filename}"));
+
+    auto results = test_controller.trigger(R"({"field1":"value1"}")", {{"elastic_action", "upsert"}});
+    REQUIRE(results[PostElasticsearch::Success].size() == 1);
+    auto attributes = results[PostElasticsearch::Success][0]->getAttributes();
+    CHECK(attributes.contains("elasticsearch.update._id"));
+    CHECK(attributes.contains("elasticsearch.update._index"));
+  }
+
+  SECTION("Update error") {
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::ApiKey.getName(),
+                                            MockElasticAuthHandler::API_KEY));
+    CHECK(test_controller.plan->setProperty(post_elasticsearch_json,
+                                            PostElasticsearch::Identifier.getName(),
+                                            "${filename}"));
+    mock_elastic.returnErrors(true);
+    auto results = test_controller.trigger(R"({"field1":"value1"}")", {{"elastic_action", "upsert"}});
+    REQUIRE(results[PostElasticsearch::Error].size() == 1);
+    auto attributes = results[PostElasticsearch::Error][0]->getAttributes();
+    CHECK(attributes.contains("elasticsearch.update._id"));
+    CHECK(attributes.contains("elasticsearch.update._index"));
+    CHECK(attributes.contains("elasticsearch.update.error.type"));
+    CHECK(attributes.contains("elasticsearch.update.error.reason"));
+  }
+
+  SECTION("Invalid ApiKey") {
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::ApiKey.getName(),
+                                            "invalid_api_key"));
+
+    auto results = test_controller.trigger(R"({"field1":"value1"}")", {{"elastic_action", "create"}});
+    CHECK(results[PostElasticsearch::Failure].size() == 1);
+  }
+
+  SECTION("Invalid basic authentication") {
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::Username.getName(),
+                                            MockElasticAuthHandler::USERNAME));
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::Password.getName(),
+                                            "wrong_password"));
+
+    auto results = test_controller.trigger(R"({"field1":"value1"}")", {{"elastic_action", "index"}});
+    CHECK(results[PostElasticsearch::Failure].size() == 1);
+  }
+}
+
+}  // namespace org::apache::nifi::minifi::extensions::elasticsearch::test
diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp
index dd28e47be..382087244 100644
--- a/extensions/http-curl/client/HTTPClient.cpp
+++ b/extensions/http-curl/client/HTTPClient.cpp
@@ -31,10 +31,10 @@
 
 namespace org::apache::nifi::minifi::utils {
 
-HTTPClient::HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service)
+HTTPClient::HTTPClient(std::string url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service)
     : core::Connectable("HTTPClient"),
       ssl_context_service_(ssl_context_service),
-      url_(url) {
+      url_(std::move(url)) {
   http_session_ = curl_easy_init();
 }
 
@@ -135,6 +135,14 @@ void HTTPClient::setDisableHostVerification() {
   curl_easy_setopt(http_session_, CURLOPT_SSL_VERIFYHOST, 0L);
 }
 
+void HTTPClient::setBasicAuth(std::string username, std::string password) {
+  username_password_.emplace(std::move(username), std::move(password));
+}
+
+void HTTPClient::clearBasicAuth() {
+  username_password_.reset();
+}
+
 bool HTTPClient::setSpecificSSLVersion(SSLVersion specific_version) {
 #if CURL_AT_LEAST_VERSION(7, 54, 0)
   // bitwise or of different enum types is deprecated in C++20, but the curl api explicitly supports ORing one of CURL_SSLVERSION and one of CURL_SSLVERSION_MAX
@@ -269,6 +277,11 @@ bool HTTPClient::submit() {
     curl_easy_setopt(http_session_, CURLOPT_HTTPHEADER, headers_);
   }
 
+  if (username_password_) {
+    curl_easy_setopt(http_session_, CURLOPT_USERNAME, username_password_->username.c_str());
+    curl_easy_setopt(http_session_, CURLOPT_PASSWORD, username_password_->password.c_str());
+  }
+
   curl_easy_setopt(http_session_, CURLOPT_URL, url_.c_str());
   logger_->log_debug("Submitting to %s", url_);
   if (callback == nullptr) {
@@ -325,7 +338,7 @@ const char *HTTPClient::getContentType() {
 }
 
 const std::vector<char> &HTTPClient::getResponseBody() {
-  if (response_body_.size() == 0) {
+  if (response_body_.empty()) {
     if (callback && callback->ptr) {
       response_body_ = callback->ptr->to_string();
     } else {
@@ -437,7 +450,7 @@ void HTTPClient::setFollowRedirects(bool follow) {
 }
 
 bool HTTPClient::isValidHttpHeaderField(std::string_view field_name) {
-  if (field_name.size() == 0) {
+  if (field_name.empty()) {
     return false;
   }
 
@@ -452,7 +465,7 @@ bool HTTPClient::isValidHttpHeaderField(std::string_view field_name) {
 }
 
 std::string HTTPClient::replaceInvalidCharactersInHttpHeaderFieldName(std::string_view field_name) {
-  if (field_name.size() == 0) {
+  if (field_name.empty()) {
     return "X-MiNiFi-Empty-Attribute-Name";
   }
 
diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h
index 29f60b0e7..934baf1b3 100644
--- a/extensions/http-curl/client/HTTPClient.h
+++ b/extensions/http-curl/client/HTTPClient.h
@@ -38,6 +38,7 @@
 #include <map>
 #include <chrono>
 #include <string>
+#include <utility>
 #ifdef WIN32
 #include <regex>
 #else
@@ -70,7 +71,7 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
   HTTPClient(const HTTPClient&) = delete;
   HTTPClient& operator=(const HTTPClient&) = delete;
 
-  explicit HTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr);
+  explicit HTTPClient(std::string url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr);
 
   ~HTTPClient() override;
 
@@ -138,6 +139,9 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   void setDisableHostVerification() override;
 
+  void setBasicAuth(std::string username, std::string password) override;
+  void clearBasicAuth() override;
+
   bool setSpecificSSLVersion(SSLVersion specific_version) override;
 
   bool setMinimumSSLVersion(SSLVersion minimum_version) override;
@@ -297,6 +301,15 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   std::chrono::milliseconds keep_alive_idle_{-1};
 
+  struct BasicAuthCredentials {
+    BasicAuthCredentials(std::string username, std::string password) : username(std::move(username)), password(std::move(password)) {}
+
+    std::string username;
+    std::string password;
+  };
+
+  std::optional<BasicAuthCredentials> username_password_;
+
   std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<HTTPClient>::getLogger()};
 };
 
diff --git a/extensions/http-curl/tests/CivetLibrary.h b/extensions/http-curl/tests/CivetLibrary.h
new file mode 100644
index 000000000..ffb70b522
--- /dev/null
+++ b/extensions/http-curl/tests/CivetLibrary.h
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <atomic>
+#include "civetweb.h"
+
+class CivetLibrary {
+ public:
+  CivetLibrary() {
+    if (getCounter()++ == 0) {
+      mg_init_library(0);
+    }
+  }
+  ~CivetLibrary() {
+    if (--getCounter() == 0) {
+      mg_exit_library();
+    }
+  }
+ private:
+  static std::atomic<int>& getCounter() {
+    static std::atomic<int> counter{0};
+    return counter;
+  }
+};
diff --git a/extensions/http-curl/tests/TestServer.h b/extensions/http-curl/tests/TestServer.h
index 12f959129..f690789f8 100644
--- a/extensions/http-curl/tests/TestServer.h
+++ b/extensions/http-curl/tests/TestServer.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <vector>
 #include "civetweb.h"
+#include "CivetLibrary.h"
 #include "CivetServer.h"
 #include "HTTPUtils.h"
 #include "ServerAwareHandler.h"
@@ -31,24 +32,6 @@
  * initiated it might get stuck inside worker_thread_run > consume_socket)
  */
 class TestServer{
-  struct CivetLibrary{
-    CivetLibrary() {
-      if (getCounter()++ == 0) {
-        mg_init_library(0);
-      }
-    }
-    ~CivetLibrary() {
-      if (--getCounter() == 0) {
-        mg_exit_library();
-      }
-    }
-   private:
-    static std::atomic<int>& getCounter() {
-      static std::atomic<int> counter{0};
-      return counter;
-    }
-  };
-
  public:
   TestServer(std::string &port, std::string &rooturi, CivetHandler *handler, CivetCallbacks *callbacks, std::string& cert, std::string &ca_cert) {
     if (!mg_check_feature(2)) {
diff --git a/extensions/splunk/tests/MockSplunkHEC.h b/extensions/splunk/tests/MockSplunkHEC.h
index 0b6351b03..a83014ea8 100644
--- a/extensions/splunk/tests/MockSplunkHEC.h
+++ b/extensions/splunk/tests/MockSplunkHEC.h
@@ -22,6 +22,7 @@
 #include <utility>
 #include <vector>
 #include <CivetServer.h>
+#include "tests/CivetLibrary.h"
 #include "core/logging/Logger.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "rapidjson/document.h"
@@ -173,24 +174,6 @@ class AckIndexerHandler : public MockSplunkHandler {
 };
 
 class MockSplunkHEC {
-  struct CivetLibrary{
-    CivetLibrary() {
-      if (getCounter()++ == 0) {
-        mg_init_library(0);
-      }
-    }
-    ~CivetLibrary() {
-      if (--getCounter() == 0) {
-        mg_exit_library();
-      }
-    }
-   private:
-    static std::atomic<int>& getCounter() {
-      static std::atomic<int> counter{0};
-      return counter;
-    }
-  };
-
  public:
   static constexpr const char* TOKEN = "Splunk 822f7d13-2b70-4f8c-848b-86edfc251222";
 
diff --git a/extensions/standard-processors/tests/unit/ProcessorTests.cpp b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
index 65939a2d0..0c4001f38 100644
--- a/extensions/standard-processors/tests/unit/ProcessorTests.cpp
+++ b/extensions/standard-processors/tests/unit/ProcessorTests.cpp
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#define EXTENSION_LIST "*minifi-*,!*http-curl*,!*coap*,!*splunk*"
+#define EXTENSION_LIST "*minifi-*,!*http-curl*,!*coap*,!*splunk*,!*elastic*"
 
 #include <cstdio>
 #include <utility>
diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h
index 91dc71394..4a171de65 100644
--- a/libminifi/include/utils/HTTPClient.h
+++ b/libminifi/include/utils/HTTPClient.h
@@ -341,6 +341,9 @@ class BaseHTTPClient {
 
   virtual void setHTTPProxy(const utils::HTTPProxy &proxy) = 0;
 
+  virtual void setBasicAuth(std::string username, std::string password) = 0;
+  virtual void clearBasicAuth() = 0;
+
   virtual void setDisableHostVerification() = 0;
 
   virtual bool setSpecificSSLVersion(SSLVersion specific_version) = 0;
diff --git a/libminifi/include/utils/JsonCallback.h b/libminifi/include/utils/JsonCallback.h
index 03887704d..99bb07240 100644
--- a/libminifi/include/utils/JsonCallback.h
+++ b/libminifi/include/utils/JsonCallback.h
@@ -33,6 +33,26 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
+class JsonInputCallback {
+ public:
+  explicit JsonInputCallback(rapidjson::Document& document) : document_(document) {}
+  int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) {
+    std::string content;
+    content.resize(stream->size());
+    const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>());
+    if (io::isError(read_ret)) {
+      return -1;
+    }
+    rapidjson::ParseResult parse_result = document_.Parse<rapidjson::kParseStopWhenDoneFlag>(content.data());
+    if (parse_result.IsError())
+      return -1;
+
+    return read_ret;
+  }
+ private:
+  rapidjson::Document& document_;
+};
+
 class JsonOutputCallback {
  public:
   explicit JsonOutputCallback(rapidjson::Document&& root, std::optional<uint8_t> decimal_places)
diff --git a/libminifi/test/SingleProcessorTestController.h b/libminifi/test/SingleProcessorTestController.h
index 0cc22be37..70c163d30 100644
--- a/libminifi/test/SingleProcessorTestController.h
+++ b/libminifi/test/SingleProcessorTestController.h
@@ -31,6 +31,11 @@
 using namespace std::chrono_literals;
 
 namespace org::apache::nifi::minifi::test {
+struct InputFlowFileData {
+  std::string_view content;
+  std::unordered_map<std::string, std::string> attributes = {};
+};
+
 
 using ProcessorTriggerResult = std::unordered_map<core::Relationship, std::vector<std::shared_ptr<core::FlowFile>>>;
 
@@ -57,9 +62,19 @@ class SingleProcessorTestController : public TestController {
     return result;
   }
 
+  auto trigger(InputFlowFileData&& input_flow_file_data) {
+    input_->put(createFlowFile(input_flow_file_data.content, std::move(input_flow_file_data.attributes)));
+    return trigger();
+  }
+
   auto trigger(const std::string_view input_flow_file_content, std::unordered_map<std::string, std::string> input_flow_file_attributes = {}) {
-    const auto new_flow_file = createFlowFile(input_flow_file_content, std::move(input_flow_file_attributes));
-    input_->put(new_flow_file);
+    return trigger({input_flow_file_content, std::move(input_flow_file_attributes)});
+  }
+
+  auto trigger(std::vector<InputFlowFileData>&& input_flow_file_datas) {
+    for (auto& input_flow_file_data : std::move(input_flow_file_datas)) {
+      input_->put(createFlowFile(input_flow_file_data.content, std::move(input_flow_file_data.attributes)));
+    }
     return trigger();
   }
 
diff --git a/win_build_vs.bat b/win_build_vs.bat
index 35d3d1f02..c91d210fe 100755
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -60,6 +60,7 @@ for %%x in (%*) do (
     if [%%~x] EQU [/PDH]         set build_PDH=ON
     if [%%~x] EQU [/SPLUNK]      set build_SPLUNK=ON
     if [%%~x] EQU [/GCP]         set build_GCP=ON
+    if [%%~x] EQU [/ELASTIC]     set build_ELASTIC=ON
     if [%%~x] EQU [/M]           set installer_merge_modules=ON
     if [%%~x] EQU [/Z]           set build_azure=ON
     if [%%~x] EQU [/N]           set build_nanofi=ON
@@ -77,7 +78,7 @@ for %%x in (%*) do (
 mkdir %builddir%
 pushd %builddir%\
 
-cmake -G %generator% -A %build_platform% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL% -DUSE_REAL_ODBC_TEST_DRIVER=%real_odbc% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% -DENABLE_PDH=%build_PDH% -DENABLE_AZURE=%build_azure% -D [...]
+cmake -G %generator% -A %build_platform% -DINSTALLER_MERGE_MODULES=%installer_merge_modules% -DTEST_CUSTOM_WEL_PROVIDER=%test_custom_wel_provider% -DENABLE_SQL=%build_SQL% -DUSE_REAL_ODBC_TEST_DRIVER=%real_odbc% -DCMAKE_BUILD_TYPE_INIT=%cmake_build_type% -DCMAKE_BUILD_TYPE=%cmake_build_type% -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=%build_kafka% -DENABLE_JNI=%build_jni% -DOPENSSL_OFF=OFF -DENABLE_COAP=%build_coap% -DENABLE_AWS=%build_AWS% -DENABLE_PDH=%build_PDH% -DENABLE_AZURE=%build_azure% -D [...]
 IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%
 if [%cpack%] EQU [ON] (
     cpack -C %cmake_build_type%