You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/06/26 01:56:40 UTC

[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1349: MINIFICPP-1843 Implement PostElasticsearch

szaszm commented on code in PR #1349:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1349#discussion_r906742608


##########
docker/test/integration/resources/elasticsearch/Dockerfile:
##########
@@ -0,0 +1,7 @@
+FROM elasticsearch:8.2.2
+COPY elasticsearch.yml /usr/share/elasticsearch/config/elasticsearch.yml
+COPY certs/elastic_cert.key /usr/share/elasticsearch/config/certs/elastic_cert.key
+COPY certs/elastic_cert.pem /usr/share/elasticsearch/config/certs/elastic_cert.pem
+COPY certs/elastic_transport.key /usr/share/elasticsearch/config/certs/elastic_transport.key
+COPY certs/elastic_transport.pem /usr/share/elasticsearch/config/certs/elastic_transport.pem

Review Comment:
   Both the cert and the private keys are in PEM format. Maybe using the .key.pem and .crt.pem, or .key and .crt extension would be less confusing.



##########
extensions/elasticsearch/tests/PostElasticsearchTests.cpp:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PostElasticsearch.h"
+#include "../ElasticsearchCredentialsControllerService.h"
+#include "MockElastic.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch::test {
+
+TEST_CASE("PostElasticsearch", "[elastic]") {
+  MockElastic mock_elastic("10433");
+
+  std::shared_ptr<PostElasticsearch> put_elasticsearch_json = std::make_shared<PostElasticsearch>("PostElasticsearch");
+  minifi::test::SingleProcessorTestController test_controller{put_elasticsearch_json};
+  auto elasticsearch_credentials_controller_service = test_controller.plan->addController("ElasticsearchCredentialsControllerService", "elasticsearch_credentials_controller_service");
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                     PostElasticsearch::ElasticCredentials.getName(),
+                                     "elasticsearch_credentials_controller_service"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Hosts.getName(),
+                                    "localhost:10433"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Action.getName(),
+                                    "${elastic_action}"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Index.getName(),
+                                    "test_index"));
+
+  SECTION("Index with valid basic authentication") {
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::Username.getName(),
+                                            MockElasticAuthHandler::USERNAME));
+    CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            ElasticsearchCredentialsControllerService::Password.getName(),
+                                            MockElasticAuthHandler::PASSWORD));
+
+    std::vector<std::tuple<const std::string_view, std::unordered_map<std::string, std::string>>> inputs;

Review Comment:
   `inputs` looks unused to me.



##########
extensions/elasticsearch/tests/PostElasticsearchTests.cpp:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PostElasticsearch.h"
+#include "../ElasticsearchCredentialsControllerService.h"
+#include "MockElastic.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch::test {
+
+TEST_CASE("PostElasticsearch", "[elastic]") {
+  MockElastic mock_elastic("10433");
+
+  std::shared_ptr<PostElasticsearch> put_elasticsearch_json = std::make_shared<PostElasticsearch>("PostElasticsearch");

Review Comment:
   [ES.11: Use auto to avoid redundant repetition of type names](https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#es11-use-auto-to-avoid-redundant-repetition-of-type-names)



##########
PROCESSORS.md:
##########
@@ -1640,6 +1641,35 @@ In the list below, the names of required properties appear in bold. Any other pr
 | success | All files are routed to success |
 
 
+## PostElasticsearch
+
+### Description
+
+An Elasticsearch/Opensearch post processor that uses the Elasticsearch/Opensearch _bulk REST API.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name                                           | Default Value | Allowable Values | Description                                                                                                                                                                                                                                                                                               |
+|------------------------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Action                                         |               |                  | The type of the operation used to index (create, delete, index, update, upsert)<br/>**Supports Expression Language: true**                                                                                                                                                                                |
+| Max Batch Size                                 | 100           |                  | The maximum number of Syslog events to process at a time.                                                                                                                                                                                                                                                 |

Review Comment:
   syslog events?
   ```suggestion
   | Max Batch Size                                 | 100           |                  | The maximum number of flow files to process at a time.                                                                                                                                                                                                                                                 |
   ```



##########
extensions/http-curl/client/HTTPClient.h:
##########
@@ -297,6 +301,8 @@ class HTTPClient : public BaseHTTPClient, public core::Connectable {
 
   std::chrono::milliseconds keep_alive_idle_{-1};
 
+  std::optional<std::pair<std::string, std::string>> username_password_;

Review Comment:
   Use a proper named type over a tuple. Something like `struct BasicAuthCredentials { std::string username; std::string password; };`



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles that fail for reasons unrelated to server availability go to this relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = context.getProperty(PostElasticsearch::ElasticCredentials))
+    return std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) -> nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != "delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == "upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, &payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, std::string&& payload, const size_t expected_items) -> nonstd::expected<rapidjson::Document, std::string> {

Review Comment:
   That's the purpose of using trailing return type here? Usually I prefer leading return type, except if the return type expression needs to reference the parameters (or it's easier to express it that way).



##########
libminifi/test/SingleProcessorTestController.h:
##########
@@ -57,6 +58,14 @@ class SingleProcessorTestController : public TestController {
     return trigger();
   }
 
+  auto trigger(std::initializer_list<std::tuple<const std::string_view, std::unordered_map<std::string, std::string>>> flow_files) {

Review Comment:
   Use a proper container instead, like `std:array` or `std::vector`.
   
   Use a dedicated type for flow file data representation over a tuple. Could be a dead simple aggregate struct, just give it a name.



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles that fail for reasons unrelated to server availability go to this relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a time.")

Review Comment:
   syslog again, but I think this is more general



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org