You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/06/20 11:39:47 UTC

[GitHub] [nifi-minifi-cpp] martinzink commented on a diff in pull request #1349: MINIFICPP-1843 Implement PostElasticsearch

martinzink commented on code in PR #1349:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1349#discussion_r901568353


##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles that fail for reasons unrelated to server availability go to this relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = context.getProperty(PostElasticsearch::ElasticCredentials))
+    return std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) -> nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != "delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == "upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, &payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }
+}
+}  // namespace
+
+void PostElasticsearch::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && max_batch_size_ > 0);
+  std::stringstream payload;
+  std::vector<std::shared_ptr<core::FlowFile>> flowfiles_in_payload;
+  for (size_t flow_files_processed = 0; flow_files_processed < max_batch_size_; ++flow_files_processed) {
+    auto flow_file = session->get();
+    if (!flow_file)
+      break;
+    auto elastic_payload = ElasticPayload::parse(*session, *context, flow_file);
+    if (!elastic_payload) {
+      logger_->log_error(elastic_payload.error().c_str());
+      session->transfer(flow_file, Failure);
+      continue;
+    }
+
+    payload << elastic_payload->toString() << "\n";
+    flowfiles_in_payload.push_back(flow_file);
+  }
+
+  if (flowfiles_in_payload.empty()) {
+    yield();
+    return;
+  }
+
+
+  client_.setPostFields(payload.str());
+  auto result = submitRequest(client_, flowfiles_in_payload.size());
+  if (!result) {
+    logger_->log_error(result.error().c_str());
+    for (const auto& flow_file_in_payload: flowfiles_in_payload)
+      session->transfer(flow_file_in_payload, Failure);
+    return;
+  }
+
+  auto& items = result->operator[]("items");
+  gsl_Expects(items.Size() == flowfiles_in_payload.size());
+  for (size_t i = 0; i < items.Size(); ++i) {
+    for (auto it = items[i].MemberBegin(); it != items[i].MemberEnd(); ++it) {
+      addAttributesFromResponse("elasticsearch", it, *flowfiles_in_payload[i]);
+    }
+    if (items[i].MemberBegin()->value.HasMember("error"))

Review Comment:
   Yeah the response json will be flattened and added as attributes.
   The error json seems quite informative to me, so I think its a good idea to include it. e.g.
   ```
   "update": {
     "_index": "index1",
     "_id": "5",
     "status": 404,
     "error": {
       "type": "document_missing_exception",
       "reason": "[5]: document missing",
       "index_uuid": "aAsFqTI0Tc2W0LCWgPNrOA",
       "shard": "0",
       "index": "index1"
     }
   }
   ```
   
   I've added a test for error relationship where this behavior can be checked out.
   https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/106414c9218d5c359bb4c10bf19ab81b576757c0#diff-da99b94efb790428cd1298692800cab09ee2b398538b67ec7f0d5e4bfae2605dR91-R94



##########
extensions/elasticsearch/tests/PostElasticsearchTests.cpp:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PostElasticsearch.h"
+#include "../ElasticsearchCredentialsControllerService.h"
+#include "MockElastic.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch::test {
+
+TEST_CASE("PostElasticsearch", "[elastic]") {
+  MockElastic mock_elastic("10433");
+
+  std::shared_ptr<PostElasticsearch> put_elasticsearch_json = std::make_shared<PostElasticsearch>("PostElasticsearch");
+  minifi::test::SingleProcessorTestController test_controller{put_elasticsearch_json};
+  auto elasticsearch_credentials_controller_service = test_controller.plan->addController("ElasticsearchCredentialsControllerService", "elasticsearch_credentials_controller_service");
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                     PostElasticsearch::ElasticCredentials.getName(),
+                                     "elasticsearch_credentials_controller_service"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Hosts.getName(),
+                                    "localhost:10433"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Action.getName(),
+                                    "${elastic_action}"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Index.getName(),
+                                    "test_index"));
+
+  SECTION("Index with valid basic authentication") {
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::Username.getName(),
+                                            MockElasticAuthHandler::USERNAME));
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::Password.getName(),
+                                            MockElasticAuthHandler::PASSWORD));
+
+    std::vector<std::tuple<const std::string_view, std::unordered_map<std::string, std::string>>> inputs;
+
+    auto results = test_controller.trigger({std::make_tuple<const std::string_view, std::unordered_map<std::string, std::string>>(R"({"field1":"value1"}")", {{"elastic_action", "index"}}),
+                                            std::make_tuple<const std::string_view, std::unordered_map<std::string, std::string>>(R"({"field1":"value2"}")", {{"elastic_action", "index"}})});
+    REQUIRE(results[PostElasticsearch::Success].size() == 2);
+    for (const auto& result : results[PostElasticsearch::Success]) {
+      auto attributes = result->getAttributes();
+      CHECK(attributes.contains("elasticsearch.index._id"));
+      CHECK(attributes.contains("elasticsearch.index._index"));
+    }
+  }
+
+  SECTION("Update with valid ApiKey") {
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::ApiKey.getName(),
+                                            MockElasticAuthHandler::API_KEY));
+    CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                            PostElasticsearch::Identifier.getName(),
+                                            "${filename}"));
+
+    auto results = test_controller.trigger(R"({"field1":"value1"}")", {{"elastic_action", "upsert"}});
+    REQUIRE(results[PostElasticsearch::Success].size() == 1);
+    auto attributes = results[PostElasticsearch::Success][0]->getAttributes();
+    CHECK(attributes.contains("elasticsearch.update._id"));
+    CHECK(attributes.contains("elasticsearch.update._index"));
+  }
+
+  SECTION("Invalid ApiKey") {
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::ApiKey.getName(),
+                                            "invalid_api_key"));
+
+    auto results = test_controller.trigger(R"({"field1":"value1"}")", {{"elastic_action", "create"}});
+    CHECK(results[PostElasticsearch::Failure].size() == 1);
+  }
+
+  SECTION("Invalid basic authentication") {

Review Comment:
   good idea, I've added it in https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/106414c9218d5c359bb4c10bf19ab81b576757c0#diff-da99b94efb790428cd1298692800cab09ee2b398538b67ec7f0d5e4bfae2605dR80-R96



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles that fail for reasons unrelated to server availability go to this relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = context.getProperty(PostElasticsearch::ElasticCredentials))
+    return std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) -> nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != "delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == "upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, &payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }
+}
+}  // namespace
+
+void PostElasticsearch::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {

Review Comment:
   good idea, i've refactored this part in https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/106414c9218d5c359bb4c10bf19ab81b576757c0



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles that fail for reasons unrelated to server availability go to this relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = context.getProperty(PostElasticsearch::ElasticCredentials))
+    return std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) -> nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != "delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == "upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, &payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }
+}
+}  // namespace
+
+void PostElasticsearch::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && max_batch_size_ > 0);
+  std::stringstream payload;
+  std::vector<std::shared_ptr<core::FlowFile>> flowfiles_in_payload;
+  for (size_t flow_files_processed = 0; flow_files_processed < max_batch_size_; ++flow_files_processed) {
+    auto flow_file = session->get();
+    if (!flow_file)
+      break;
+    auto elastic_payload = ElasticPayload::parse(*session, *context, flow_file);
+    if (!elastic_payload) {
+      logger_->log_error(elastic_payload.error().c_str());
+      session->transfer(flow_file, Failure);
+      continue;
+    }
+
+    payload << elastic_payload->toString() << "\n";
+    flowfiles_in_payload.push_back(flow_file);
+  }
+
+  if (flowfiles_in_payload.empty()) {
+    yield();
+    return;
+  }
+
+
+  client_.setPostFields(payload.str());
+  auto result = submitRequest(client_, flowfiles_in_payload.size());
+  if (!result) {
+    logger_->log_error(result.error().c_str());
+    for (const auto& flow_file_in_payload: flowfiles_in_payload)
+      session->transfer(flow_file_in_payload, Failure);
+    return;
+  }
+
+  auto& items = result->operator[]("items");

Review Comment:
   refactored this part in https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/106414c9218d5c359bb4c10bf19ab81b576757c0



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles that fail for reasons unrelated to server availability go to this relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = context.getProperty(PostElasticsearch::ElasticCredentials))
+    return std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) -> nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != "delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == "upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, &payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }

Review Comment:
   It shouldnt happen but a log message is probably a good idea.
   https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/106414c9218d5c359bb4c10bf19ab81b576757c0#diff-bdbf058968be4e5a837bb513eb74292c24911b773939d6b3404a7c5e7b9a609dR251



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles that fail for reasons unrelated to server availability go to this relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = context.getProperty(PostElasticsearch::ElasticCredentials))
+    return std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) -> nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != "delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == "upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, &payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }
+}
+}  // namespace
+
+void PostElasticsearch::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && max_batch_size_ > 0);
+  std::stringstream payload;
+  std::vector<std::shared_ptr<core::FlowFile>> flowfiles_in_payload;

Review Comment:
   makes sense, changed it in https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/106414c9218d5c359bb4c10bf19ab81b576757c0#diff-bdbf058968be4e5a837bb513eb74292c24911b773939d6b3404a7c5e7b9a609dR294



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org